ZEPPELIN-3254. Enable one build of zeppelin to work with multiple versions of spark
### What is this PR for? This PR is trying to add test for one build of zeppelin to work with multiple versions of spark. There's 2 main system test * `SparkIntegrationTest` ( It launches spark interpreter from interpreter setting component in 3 modes: local, yarn-client, yarn-cluster) * `ZeppelinSparkClusterTest` (It launch spark interpreter in local mode from Paragraph side ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3254 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2832 from zjffdu/ZEPPELIN-3254 and squashes the following commits: 3b06ed9 [Jeff Zhang] ZEPPELIN-3254. Enable one build of zeppelin to work with multiple versions of spark Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d6cdd56d Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d6cdd56d Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d6cdd56d Branch: refs/heads/master Commit: d6cdd56d8fbc551dee7c334eb403c19bc65d6e1f Parents: d90716d Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Feb 22 21:05:23 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Tue Mar 6 09:57:18 2018 +0800 ---------------------------------------------------------------------- .travis.yml | 30 +- .../zeppelin/spark/SparkRInterpreter.java | 8 +- .../interpreter/InterpreterProperty.java | 1 + zeppelin-server/notebook/.python.recovery.crc | Bin 12 -> 0 bytes zeppelin-server/notebook/python.recovery | 1 - zeppelin-server/pom.xml | 9 + .../zeppelin/rest/AbstractTestRestApi.java | 211 +--- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 1140 +++++++++--------- .../src/test/resources/log4j.properties | 3 +- zeppelin-zengine/pom.xml | 14 + .../interpreter/SparkDownloadUtils.java | 91 ++ .../interpreter/SparkIntegrationTest.java | 190 +++ .../interpreter/SparkInterpreterModeTest.java | 163 --- .../src/test/resources/log4j.properties | 6 +- 14 files changed, 984 insertions(+), 883 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index bc568fc..aba2944 100644 --- a/.travis.yml +++ b/.travis.yml @@ -66,7 +66,7 @@ matrix: - sudo: required jdk: "oraclejdk8" dist: trusty - env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" SPARKR="true" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" + env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" SPARKR="true" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false" # Test selenium with spark module for 1.6.3 - jdk: "oraclejdk8" @@ -78,32 +78,46 @@ matrix: # Test interpreter modules - jdk: "openjdk7" dist: trusty - env: PYTHON="3" SCALA_VER="2.10" PROFILE="-Pscalding" BUILD_FLAG="install -DskipTests -DskipRat -Pr" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS="" + env: PYTHON="3" SCALA_VER="2.10" PROFILE="-Pscalding" BUILD_FLAG="package -DskipTests -DskipRat -Pr" TEST_FLAG="test -DskipRat" MODULES="-pl $(echo .,zeppelin-interpreter,${INTERPRETERS} | sed 's/!//g')" TEST_PROJECTS="" + + # Run ZeppelinSparkClusterTest & SparkIntegrationTest in one build would exceed the time limitation of travis, so running them separately + + # Test spark interpreter with different spark versions under python2, only run ZeppelinSparkClusterTest + - sudo: required + jdk: "oraclejdk8" + dist: trusty + env: PYTHON="2" PROFILE="-Pspark-2.2" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-zengine,zeppelin-server,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest -DfailIfNoTests=false" + + # Test spark interpreter with different spark versions under python3, only run SparkIntegrationTest + - sudo: required + jdk: "oraclejdk8" + dist: trusty + env: PYTHON="3" PROFILE="-Pspark-2.2" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-zengine,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=SparkIntegrationTest -DfailIfNoTests=false" # Test spark module for 2.2.0 with scala 2.11 - jdk: "oraclejdk8" dist: trusty - env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test spark module for 2.1.0 with scala 2.11 - jdk: "openjdk7" dist: trusty - env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.1 -Phadoop2 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.1 -Phadoop2 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test spark module for 2.0.2 with scala 2.11 - jdk: "oraclejdk8" dist: trusty - env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test spark module for 1.6.3 with scala 2.10 - jdk: "openjdk7" dist: trusty - env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test spark module for 1.6.3 with scala 2.11 - jdk: "oraclejdk8" dist: trusty - env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false" + env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl spark/interpreter,spark/spark-dependencies" TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false" # Test python/pyspark with python 2, livy 0.5 - sudo: required @@ -116,7 +130,7 @@ matrix: dist: trusty jdk: "openjdk7" env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' -DfailIfNoTests=false" - + before_install: # check files included in commit range, clear bower_components if a bower.json file has changed. # bower cache clearing can also be forced by putting "bower clear" or "clear bower" in a commit message http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index dbaeabe..5efff0e 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -59,12 +59,18 @@ public class SparkRInterpreter extends Interpreter { String sparkRLibPath; if (System.getenv("SPARK_HOME") != null) { + // local or yarn-client mode when SPARK_HOME is specified sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib"; - } else { + } else if (System.getenv("ZEPPELIN_HOME") != null){ + // embedded mode when SPARK_HOME is not specified sparkRLibPath = System.getenv("ZEPPELIN_HOME") + "/interpreter/spark/R/lib"; // workaround to make sparkr work without SPARK_HOME System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + "/interpreter/spark"); + } else { + // yarn-cluster mode + sparkRLibPath = "sparkr"; } + synchronized (SparkRBackend.backend()) { if (!SparkRBackend.isStarted()) { SparkRBackend.init(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java index 0bb3d42..92cf3a8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java @@ -34,6 +34,7 @@ public class InterpreterProperty { public InterpreterProperty(String name, Object value) { this.name = name; this.value = value; + this.type = InterpreterPropertyType.TEXTAREA.getValue(); } public String getName() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/notebook/.python.recovery.crc ---------------------------------------------------------------------- diff --git a/zeppelin-server/notebook/.python.recovery.crc b/zeppelin-server/notebook/.python.recovery.crc deleted file mode 100644 index 6bd3e7a..0000000 Binary files a/zeppelin-server/notebook/.python.recovery.crc and /dev/null differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/notebook/python.recovery ---------------------------------------------------------------------- diff --git a/zeppelin-server/notebook/python.recovery b/zeppelin-server/notebook/python.recovery deleted file mode 100644 index eaf4938..0000000 --- a/zeppelin-server/notebook/python.recovery +++ /dev/null @@ -1 +0,0 @@ -2CZA1DVUG:shared_process 192.168.3.2:55410 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index 970f302..1622791 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -341,6 +341,15 @@ </exclusion> </exclusions> </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-zengine</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 5193420..dfb5ac2 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -63,72 +63,70 @@ public abstract class AbstractTestRestApi { static final String restApiUrl = "/api"; static final String url = getUrlToTest(); protected static final boolean wasRunning = checkIfServerIsRunning(); - static boolean pySpark = false; - static boolean sparkR = false; static boolean isRunningWithAuth = false; private static File shiroIni = null; private static String zeppelinShiro = "[users]\n" + - "admin = password1, admin\n" + - "user1 = password2, role1, role2\n" + - "user2 = password3, role3\n" + - "user3 = password4, role2\n" + - "[main]\n" + - "sessionManager = org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" + - "securityManager.sessionManager = $sessionManager\n" + - "securityManager.sessionManager.globalSessionTimeout = 86400000\n" + - "shiro.loginUrl = /api/login\n" + - "[roles]\n" + - "role1 = *\n" + - "role2 = *\n" + - "role3 = *\n" + - "admin = *\n" + - "[urls]\n" + - "/api/version = anon\n" + - "/** = authc"; + "admin = password1, admin\n" + + "user1 = password2, role1, role2\n" + + "user2 = password3, role3\n" + + "user3 = password4, role2\n" + + "[main]\n" + + "sessionManager = org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" + + "securityManager.sessionManager = $sessionManager\n" + + "securityManager.sessionManager.globalSessionTimeout = 86400000\n" + + "shiro.loginUrl = /api/login\n" + + "[roles]\n" + + "role1 = *\n" + + "role2 = *\n" + + "role3 = *\n" + + "admin = *\n" + + "[urls]\n" + + "/api/version = anon\n" + + "/** = authc"; private static String zeppelinShiroKnox = "[users]\n" + - "admin = password1, admin\n" + - "user1 = password2, role1, role2\n" + - "[main]\n" + - "knoxJwtRealm = org.apache.zeppelin.realm.jwt.KnoxJwtRealm\n" + - "knoxJwtRealm.providerUrl = https://domain.example.com/\n" + - "knoxJwtRealm.login = gateway/knoxsso/knoxauth/login.html\n" + - "knoxJwtRealm.logout = gateway/knoxssout/api/v1/webssout\n" + - "knoxJwtRealm.redirectParam = originalUrl\n" + - "knoxJwtRealm.cookieName = hadoop-jwt\n" + - "knoxJwtRealm.publicKeyPath = knox-sso.pem\n" + - "authc = org.apache.zeppelin.realm.jwt.KnoxAuthenticationFilter\n" + - "sessionManager = org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" + - "securityManager.sessionManager = $sessionManager\n" + - "securityManager.sessionManager.globalSessionTimeout = 86400000\n" + - "shiro.loginUrl = /api/login\n" + - "[roles]\n" + - "admin = *\n" + - "[urls]\n" + - "/api/version = anon\n" + - "/** = authc"; + "admin = password1, admin\n" + + "user1 = password2, role1, role2\n" + + "[main]\n" + + "knoxJwtRealm = org.apache.zeppelin.realm.jwt.KnoxJwtRealm\n" + + "knoxJwtRealm.providerUrl = https://domain.example.com/\n" + + "knoxJwtRealm.login = gateway/knoxsso/knoxauth/login.html\n" + + "knoxJwtRealm.logout = gateway/knoxssout/api/v1/webssout\n" + + "knoxJwtRealm.redirectParam = originalUrl\n" + + "knoxJwtRealm.cookieName = hadoop-jwt\n" + + "knoxJwtRealm.publicKeyPath = knox-sso.pem\n" + + "authc = org.apache.zeppelin.realm.jwt.KnoxAuthenticationFilter\n" + + "sessionManager = org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" + + "securityManager.sessionManager = $sessionManager\n" + + "securityManager.sessionManager.globalSessionTimeout = 86400000\n" + + "shiro.loginUrl = /api/login\n" + + "[roles]\n" + + "admin = *\n" + + "[urls]\n" + + "/api/version = anon\n" + + "/** = authc"; private static File knoxSsoPem = null; private static String KNOX_SSO_PEM = "-----BEGIN CERTIFICATE-----\n" - + "MIIChjCCAe+gAwIBAgIJALYrdDEXKwcqMA0GCSqGSIb3DQEBBQUAMIGEMQswCQYD\n" - + "VQQGEwJVUzENMAsGA1UECBMEVGVzdDENMAsGA1UEBxMEVGVzdDEPMA0GA1UEChMG\n" - + "SGFkb29wMQ0wCwYDVQQLEwRUZXN0MTcwNQYDVQQDEy5jdHItZTEzNS0xNTEyMDY5\n" - + "MDMyOTc1LTU0NDctMDEtMDAwMDAyLmh3eC5zaXRlMB4XDTE3MTIwNDA5NTIwMFoX\n" - + "DTE4MTIwNDA5NTIwMFowgYQxCzAJBgNVBAYTAlVTMQ0wCwYDVQQIEwRUZXN0MQ0w\n" - + "CwYDVQQHEwRUZXN0MQ8wDQYDVQQKEwZIYWRvb3AxDTALBgNVBAsTBFRlc3QxNzA1\n" - + "BgNVBAMTLmN0ci1lMTM1LTE1MTIwNjkwMzI5NzUtNTQ0Ny0wMS0wMDAwMDIuaHd4\n" - + "LnNpdGUwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAILFoXdz3yCy2INncYM2\n" - + "y72fYrONoQIxeeIzeJIibXLTuowSju90Q6aThSyUsQ6NEia2flnlKiCgINTNAodh\n" - + "UPUVGyGT+NMrqJzzpXAll2UUa6gIUPnXYEzYNkMIpbQOAo5BAg7YamaidbPPiT3W\n" - + "wAD1rWo3AMUY+nZJrAi4dEH5AgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAB0R07/lo\n" - + "4hD+WeDEeyLTnsbFnPNXxBT1APMUmmuCjcky/19ZB8OphqTKIITONdOK/XHdjZHG\n" - + "JDOfhBkVknL42lSi45ahUAPS2PZOlQL08MbS8xajP1faterm+aHcdwJVK9dK76RB\n" - + "/bA8TFNPblPxavIOcd+R+RfFmT1YKfYIhco=\n" - + "-----END CERTIFICATE-----"; + + "MIIChjCCAe+gAwIBAgIJALYrdDEXKwcqMA0GCSqGSIb3DQEBBQUAMIGEMQswCQYD\n" + + "VQQGEwJVUzENMAsGA1UECBMEVGVzdDENMAsGA1UEBxMEVGVzdDEPMA0GA1UEChMG\n" + + "SGFkb29wMQ0wCwYDVQQLEwRUZXN0MTcwNQYDVQQDEy5jdHItZTEzNS0xNTEyMDY5\n" + + "MDMyOTc1LTU0NDctMDEtMDAwMDAyLmh3eC5zaXRlMB4XDTE3MTIwNDA5NTIwMFoX\n" + + "DTE4MTIwNDA5NTIwMFowgYQxCzAJBgNVBAYTAlVTMQ0wCwYDVQQIEwRUZXN0MQ0w\n" + + "CwYDVQQHEwRUZXN0MQ8wDQYDVQQKEwZIYWRvb3AxDTALBgNVBAsTBFRlc3QxNzA1\n" + + "BgNVBAMTLmN0ci1lMTM1LTE1MTIwNjkwMzI5NzUtNTQ0Ny0wMS0wMDAwMDIuaHd4\n" + + "LnNpdGUwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAILFoXdz3yCy2INncYM2\n" + + "y72fYrONoQIxeeIzeJIibXLTuowSju90Q6aThSyUsQ6NEia2flnlKiCgINTNAodh\n" + + "UPUVGyGT+NMrqJzzpXAll2UUa6gIUPnXYEzYNkMIpbQOAo5BAg7YamaidbPPiT3W\n" + + "wAD1rWo3AMUY+nZJrAi4dEH5AgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAB0R07/lo\n" + + "4hD+WeDEeyLTnsbFnPNXxBT1APMUmmuCjcky/19ZB8OphqTKIITONdOK/XHdjZHG\n" + + "JDOfhBkVknL42lSi45ahUAPS2PZOlQL08MbS8xajP1faterm+aHcdwJVK9dK76RB\n" + + "/bA8TFNPblPxavIOcd+R+RfFmT1YKfYIhco=\n" + + "-----END CERTIFICATE-----"; protected static File zeppelinHome; protected static File confDir; @@ -159,7 +157,7 @@ public abstract class AbstractTestRestApi { @Override public void run() { try { - ZeppelinServer.main(new String[] {""}); + ZeppelinServer.main(new String[]{""}); } catch (Exception e) { LOG.error("Exception in WebDriverManager while getWebDriver ", e); throw new RuntimeException(e); @@ -190,7 +188,7 @@ public abstract class AbstractTestRestApi { isRunningWithAuth = true; // Set Anonymous session to false. System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName(), "false"); - + // Create a shiro env test. shiroIni = new File(confDir, "shiro.ini"); if (!shiroIni.exists()) { @@ -246,66 +244,10 @@ public abstract class AbstractTestRestApi { throw new RuntimeException("Can not start Zeppelin server"); } LOG.info("Test Zeppelin stared."); - - - // assume first one is spark - InterpreterSetting sparkIntpSetting = null; - for(InterpreterSetting intpSetting : - ZeppelinServer.notebook.getInterpreterSettingManager().get()) { - if (intpSetting.getName().equals("spark")) { - sparkIntpSetting = intpSetting; - } - } - - Map<String, InterpreterProperty> sparkProperties = - (Map<String, InterpreterProperty>) sparkIntpSetting.getProperties(); - // ci environment runs spark cluster for testing - // so configure zeppelin use spark cluster - if ("true".equals(System.getenv("CI"))) { - // set spark master and other properties - sparkProperties.put("master", - new InterpreterProperty("master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("spark.master", - new InterpreterProperty("spark.master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("spark.cores.max", - new InterpreterProperty("spark.cores.max", "2", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("zeppelin.spark.useHiveContext", - new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue())); - sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("zeppelin.spark.test", new InterpreterProperty("zeppelin.spark.test", "true", InterpreterPropertyType.TEXTAREA.getValue())); - sparkIntpSetting.setProperties(sparkProperties); - pySpark = true; - sparkR = true; - ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId()); - } else { - String sparkHome = getSparkHome(); - LOG.info("SPARK HOME detected " + sparkHome); - if (sparkHome != null) { - if (System.getenv("SPARK_MASTER") != null) { - sparkProperties.put("master", - new InterpreterProperty("master", System.getenv("SPARK_MASTER"), InterpreterPropertyType.TEXTAREA.getValue())); - } else { - sparkProperties.put("master", - new InterpreterProperty("master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); - } - sparkProperties.put("spark.master", - new InterpreterProperty("spark.master", "local[2]", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("spark.cores.max", - new InterpreterProperty("spark.cores.max", "2", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("zeppelin.spark.useHiveContext", - new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue())); - sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue())); - sparkProperties.put("zeppelin.spark.test", new InterpreterProperty("zeppelin.spark.test", "true", InterpreterPropertyType.TEXTAREA.getValue())); - - pySpark = true; - sparkR = true; - } - - ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId()); - } } } + protected static void startUpWithKnoxEnable(String testClassName) throws Exception { start(true, testClassName, true); } @@ -327,53 +269,12 @@ public abstract class AbstractTestRestApi { } } - private static String getSparkHome() { - String sparkHome = System.getenv("SPARK_HOME"); - if (sparkHome != null) { - return sparkHome; - } - sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName()))); - return sparkHome; - } - - boolean isPyspark() { - return pySpark; - } - - boolean isSparkR() { - return sparkR; - } - - private static String getSparkHomeRecursively(File dir) { - if (dir == null) return null; - File files [] = dir.listFiles(); - if (files == null) return null; - - File homeDetected = null; - for (File f : files) { - if (isActiveSparkHome(f)) { - homeDetected = f; - break; - } - } - - if (homeDetected != null) { - return homeDetected.getAbsolutePath(); - } else { - return getSparkHomeRecursively(dir.getParentFile()); - } - } - - private static boolean isActiveSparkHome(File dir) { - return dir.getName().matches("spark-[0-9\\.]+[A-Za-z-]*-bin-hadoop[0-9\\.]+"); - } - protected static void shutDown() throws Exception { shutDown(true); } protected static void shutDown(final boolean deleteConfDir) throws Exception { - if (!wasRunning) { + if (!wasRunning && ZeppelinServer.notebook != null) { // restart interpreter to stop all interpreter processes List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get(); if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index f3a7099..bda555a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -16,588 +16,630 @@ */ package org.apache.zeppelin.rest; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterProperty; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.SparkDownloadUtils; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; /** * Test against spark cluster. - * Spark cluster is started by CI server using testing/startSparkCluster.sh */ +@RunWith(value = Parameterized.class) public class ZeppelinSparkClusterTest extends AbstractTestRestApi { - AuthenticationInfo anonymous; - - @BeforeClass - public static void init() throws Exception { - AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName()); - } - - @AfterClass - public static void destroy() throws Exception { - AbstractTestRestApi.shutDown(); - } - - @Before - public void setUp() { - anonymous = new AuthenticationInfo("anonymous"); + private static Logger LOGGER = LoggerFactory.getLogger(ZeppelinSparkClusterTest.class); + + private String sparkVersion; + private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous"); + + public ZeppelinSparkClusterTest(String sparkVersion) throws Exception { + this.sparkVersion = sparkVersion; + LOGGER.info("Testing SparkVersion: " + sparkVersion); + String sparkHome = SparkDownloadUtils.downloadSpark(sparkVersion); + setupSparkInterpreter(sparkHome); + verifySparkVersionNumber(); + } + + @Parameterized.Parameters + public static List<Object[]> data() { + return Arrays.asList(new Object[][]{ + {"2.2.1"}, + {"2.1.2"}, + {"2.0.2"}, + {"1.6.3"} + }); + } + + public void setupSparkInterpreter(String sparkHome) throws InterpreterException { + InterpreterSetting sparkIntpSetting = ZeppelinServer.notebook.getInterpreterSettingManager() + .getInterpreterSettingByName("spark"); + + Map<String, InterpreterProperty> sparkProperties = + (Map<String, InterpreterProperty>) sparkIntpSetting.getProperties(); + LOG.info("SPARK HOME detected " + sparkHome); + if (System.getenv("SPARK_MASTER") != null) { + sparkProperties.put("master", + new InterpreterProperty("master", System.getenv("SPARK_MASTER"))); + } else { + sparkProperties.put("master", new InterpreterProperty("master", "local[2]")); } - - private void waitForFinish(Paragraph p) { - while (p.getStatus() != Status.FINISHED - && p.getStatus() != Status.ERROR - && p.getStatus() != Status.ABORT) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.error("Exception in WebDriverManager while getWebDriver ", e); - } - } - } - - @Test - public void scalaOutputTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark import java.util.Date\n" + - "import java.net.URL\n" + - "println(\"hello\")\n" - ); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("import java.util.Date\n" + - "import java.net.URL\n" + - "hello\n", p.getResult().message().get(0).getData()); - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); - } - - - - @Test - public void basicRDDTransformationAndActionTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - - // run markdown paragraph, again - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("55", p.getResult().message().get(0).getData()); - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", sparkHome)); + sparkProperties.put("spark.master", new InterpreterProperty("spark.master", "local[2]")); + sparkProperties.put("spark.cores.max", + new InterpreterProperty("spark.cores.max", "2")); + sparkProperties.put("zeppelin.spark.useHiveContext", + new InterpreterProperty("zeppelin.spark.useHiveContext", "false")); + sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false")); + sparkProperties.put("zeppelin.spark.useNew", new InterpreterProperty("zeppelin.spark.useNew", "true")); + sparkProperties.put("zeppelin.spark.test", new InterpreterProperty("zeppelin.spark.test", "true")); + + ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId()); + } + + @BeforeClass + public static void setUp() throws Exception { + AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName()); + } + + @AfterClass + public static void destroy() throws Exception { + AbstractTestRestApi.shutDown(); + } + + private void waitForFinish(Paragraph p) { + while (p.getStatus() != Status.FINISHED + && p.getStatus() != Status.ERROR + && p.getStatus() != Status.ABORT) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Exception in WebDriverManager while getWebDriver ", e); + } } - - @Test - public void sparkSQLTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - int sparkVersion = getSparkVersionNumber(note); - // DataFrame API is available from spark 1.3 - if (sparkVersion >= 13) { - // test basic dataframe api - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + - "df.collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertTrue(p.getResult().message().get(0).getData().contains( - "Array[org.apache.spark.sql.Row] = Array([hello,20])")); - - // test display DataFrame - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + - "z.show(df)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); - assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); - - // test display DataSet - if (sparkVersion >= 20) { - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" + - "z.show(ds)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); - assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); - } - } - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void scalaOutputTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark import java.util.Date\n" + + "import java.net.URL\n" + + "println(\"hello\")\n" + ); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("import java.util.Date\n" + + "import java.net.URL\n" + + "hello\n", p.getResult().message().get(0).getData()); + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + + @Test + public void basicRDDTransformationAndActionTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + + // run markdown paragraph, again + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("55", p.getResult().message().get(0).getData()); + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void sparkSQLTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + // test basic dataframe api + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + + "df.collect()"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertTrue(p.getResult().message().get(0).getData().contains( + "Array[org.apache.spark.sql.Row] = Array([hello,20])")); + + // test display DataFrame + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + + "z.show(df)"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); + assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); + + // test display DataSet + if (isSpark2()) { + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" + + "z.show(ds)"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); + assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); } - @Test - public void sparkRTest() throws IOException, InterpreterException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - int sparkVersion = getSparkVersionNumber(note); - - if (isSparkR() && sparkVersion >= 14) { // sparkr supported from 1.4.0 - // restart spark interpreter - List<InterpreterSetting> settings = - ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId()); - - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("spark")) { - ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); - break; - } - } - - String sqlContextName = "sqlContext"; - if (sparkVersion >= 20) { - sqlContextName = "spark"; - } - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + - "count(df)" - ); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - System.err.println("sparkRTest=" + p.getResult().message().get(0).getData()); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("[1] 3", p.getResult().message().get(0).getData().trim()); + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void sparkRTest() throws IOException, InterpreterException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + // restart spark interpreter + List<InterpreterSetting> settings = + ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId()); + + for (InterpreterSetting setting : settings) { + if (setting.getName().equals("spark")) { + ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); + break; } - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } - @Test - public void pySparkTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - note.setName("note"); - int sparkVersion = getSparkVersionNumber(note); - - if (isPyspark() && sparkVersion >= 12) { // pyspark supported from 1.2.1 - // run markdown paragraph, again - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("55\n", p.getResult().message().get(0).getData()); - if (sparkVersion >= 13) { - // run sqlContext test - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%pyspark from pyspark.sql import Row\n" + - "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + - "df.collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData()); - - // test display Dataframe - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%pyspark from pyspark.sql import Row\n" + - "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + - "z.show(df)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType()); - // TODO (zjffdu), one more \n is appended, need to investigate why. - assertEquals("age\tid\n20\t1\n", p.getResult().message().get(0).getData()); - - // test udf - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + - "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) || - "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData())); - - // test exception - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - /** - %pyspark - a=1 - - print(a2) - */ - p.setText("%pyspark a=1\n\nprint(a2)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.ERROR, p.getStatus()); - assertTrue(p.getResult().message().get(0).getData() - .contains("Fail to execute line 3: print(a2)")); - assertTrue(p.getResult().message().get(0).getData() - .contains("name 'a2' is not defined")); - } - if (sparkVersion >= 20) { - // run SparkSession test - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%pyspark from pyspark.sql import Row\n" + - "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + - "df.collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData()); - - // test udf - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - // use SQLContext to register UDF but use this UDF through SparkSession - p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + - "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) || - "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData())); - } - } - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + String sqlContextName = "sqlContext"; + if (isSpark2()) { + sqlContextName = "spark"; } - - @Test - public void pySparkAutoConvertOptionTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - note.setName("note"); - - int sparkVersionNumber = getSparkVersionNumber(note); - - if (isPyspark() && sparkVersionNumber >= 14) { // auto_convert enabled from spark 1.4 - // run markdown paragraph, again - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - - String sqlContextName = "sqlContext"; - if (sparkVersionNumber >= 20) { - sqlContextName = "spark"; - } - - p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" - + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("10\n", p.getResult().message().get(0).getData()); - } - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + + "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + + "count(df)" + ); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + System.err.println("sparkRTest=" + p.getResult().message().get(0).getData()); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[1] 3", p.getResult().message().get(0).getData().trim()); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void pySparkTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + note.setName("note"); + + // run markdown paragraph, again + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("55\n", p.getResult().message().get(0).getData()); + if (!isSpark2()) { + // run sqlContext test + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "df.collect()"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData()); + + // test display Dataframe + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "z.show(df)"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType()); + // TODO (zjffdu), one more \n is appended, need to investigate why. + assertEquals("age\tid\n20\t1\n", p.getResult().message().get(0).getData()); + + // test udf + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) || + "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData())); + + // test exception + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + /** + %pyspark + a=1 + + print(a2) + */ + p.setText("%pyspark a=1\n\nprint(a2)"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.ERROR, p.getStatus()); + assertTrue(p.getResult().message().get(0).getData() + .contains("Fail to execute line 3: print(a2)")); + assertTrue(p.getResult().message().get(0).getData() + .contains("name 'a2' is not defined")); + } else { + // run SparkSession test + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%pyspark from pyspark.sql import Row\n" + + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + + "df.collect()"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData()); + + // test udf + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + // use SQLContext to register UDF but use this UDF through SparkSession + p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + + "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) || + "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData())); } - @Test - public void zRunTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config0 = p0.getConfig(); - config0.put("enabled", true); - p0.setConfig(config0); - p0.setText("%spark z.run(1)"); - p0.setAuthenticationInfo(anonymous); - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config1 = p1.getConfig(); - config1.put("enabled", true); - p1.setConfig(config1); - p1.setText("%spark val a=10"); - p1.setAuthenticationInfo(anonymous); - Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config2 = p2.getConfig(); - config2.put("enabled", true); - p2.setConfig(config2); - p2.setText("%spark print(a)"); - p2.setAuthenticationInfo(anonymous); - - note.run(p0.getId()); - waitForFinish(p0); - assertEquals(Status.FINISHED, p0.getStatus()); - - // z.run is not blocking call. So p1 may not be finished when p0 is done. - waitForFinish(p1); - note.run(p2.getId()); - waitForFinish(p2); - assertEquals(Status.FINISHED, p2.getStatus()); - assertEquals("10", p2.getResult().message().get(0).getData()); - - Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config3 = p3.getConfig(); - config3.put("enabled", true); - p3.setConfig(config3); - p3.setText("%spark println(new java.util.Date())"); - p3.setAuthenticationInfo(anonymous); - - p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId())); - note.run(p0.getId()); - waitForFinish(p0); - waitForFinish(p1); - waitForFinish(p2); - waitForFinish(p3); - - assertEquals(Status.FINISHED, p3.getStatus()); - String p3result = p3.getResult().message().get(0).getData(); - assertNotEquals(null, p3result); - assertNotEquals("", p3result); - - p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), p3.getId())); - p3.setText("%%spark println(\"END\")"); - - note.run(p0.getId()); - waitForFinish(p0); - waitForFinish(p3); - - assertNotEquals(p3result, p3.getResult().message()); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); - } + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } - @Test - public void pySparkDepLoaderTest() throws IOException, InterpreterException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - int sparkVersionNumber = getSparkVersionNumber(note); - - if (isPyspark() && sparkVersionNumber >= 14) { - // restart spark interpreter - List<InterpreterSetting> settings = - ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId()); - - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("spark")) { - ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); - break; - } - } - - // load dep - Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p0.getConfig(); - config.put("enabled", true); - p0.setConfig(config); - p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")"); - p0.setAuthenticationInfo(anonymous); - note.run(p0.getId()); - waitForFinish(p0); - assertEquals(Status.FINISHED, p0.getStatus()); - - // write test csv file - File tmpFile = File.createTempFile("test", "csv"); - FileUtils.write(tmpFile, "a,b\n1,2"); - - // load data using libraries from dep loader - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setConfig(config); - - String sqlContextName = "sqlContext"; - if (sparkVersionNumber >= 20) { - sqlContextName = "spark"; - } - p1.setText("%pyspark\n" + - "from pyspark.sql import SQLContext\n" + - "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" + - ".load('" + tmpFile.getAbsolutePath() +"').count())"); - p1.setAuthenticationInfo(anonymous); - note.run(p1.getId()); - - waitForFinish(p1); - assertEquals(Status.FINISHED, p1.getStatus()); - assertEquals("2\n", p1.getResult().message().get(0).getData()); - } - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); - } + @Test + public void pySparkAutoConvertOptionTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + note.setName("note"); - /** - * Get spark version number as a numerical value. - * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... - */ - private int getSparkVersionNumber(Note note) { - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - note.setName("note"); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark print(sc.version)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - String sparkVersion = p.getResult().message().get(0).getData(); - System.out.println("Spark version detected " + sparkVersion); - String[] split = sparkVersion.split("\\."); - int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); - return version; - } + // run markdown paragraph, again + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); - @Test - public void testSparkZeppelinContextDynamicForms() throws IOException { - Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - note.setName("note"); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - String code = "%spark.spark println(z.textbox(\"my_input\", \"default_name\"))\n" + - "println(z.select(\"my_select\", \"1\"," + - "Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" + - "val items=z.checkbox(\"my_checkbox\", Seq(\"2\"), " + - "Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")))\n" + - "println(items(0))"; - p.setText(code); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - - assertEquals(Status.FINISHED, p.getStatus()); - Iterator<String> formIter = p.settings.getForms().keySet().iterator(); - assert(formIter.next().equals("my_input")); - assert(formIter.next().equals("my_select")); - assert(formIter.next().equals("my_checkbox")); - - // check dynamic forms values - String[] result = p.getResult().message().get(0).getData().split("\n"); - assertEquals(4, result.length); - assertEquals("default_name", result[0]); - assertEquals("1", result[1]); - assertEquals("items: Seq[Object] = Buffer(2)", result[2]); - assertEquals("2", result[3]); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + String sqlContextName = "sqlContext"; + if (isSpark2()) { + sqlContextName = "spark"; } - @Test - public void testPySparkZeppelinContextDynamicForms() throws IOException { - Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - note.setName("note"); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - String code = "%spark.pyspark print(z.input('my_input', 'default_name'))\n" + - "print(z.select('my_select', " + - "[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" + - "items=z.checkbox('my_checkbox', " + - "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" + - "print(items[0])"; - p.setText(code); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - - assertEquals(Status.FINISHED, p.getStatus()); - Iterator<String> formIter = p.settings.getForms().keySet().iterator(); - assert(formIter.next().equals("my_input")); - assert(formIter.next().equals("my_select")); - assert(formIter.next().equals("my_checkbox")); - - // check dynamic forms values - String[] result = p.getResult().message().get(0).getData().split("\n"); - assertEquals(3, result.length); - assertEquals("default_name", result[0]); - assertEquals("1", result[1]); - assertEquals("2", result[2]); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" + + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals("10\n", p.getResult().message().get(0).getData()); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void zRunTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config0 = p0.getConfig(); + config0.put("enabled", true); + p0.setConfig(config0); + p0.setText("%spark z.run(1)"); + p0.setAuthenticationInfo(anonymous); + Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config1 = p1.getConfig(); + config1.put("enabled", true); + p1.setConfig(config1); + p1.setText("%spark val a=10"); + p1.setAuthenticationInfo(anonymous); + Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config2 = p2.getConfig(); + config2.put("enabled", true); + p2.setConfig(config2); + p2.setText("%spark print(a)"); + p2.setAuthenticationInfo(anonymous); + + note.run(p0.getId()); + waitForFinish(p0); + assertEquals(Status.FINISHED, p0.getStatus()); + + // z.run is not blocking call. So p1 may not be finished when p0 is done. + waitForFinish(p1); + note.run(p2.getId()); + waitForFinish(p2); + assertEquals(Status.FINISHED, p2.getStatus()); + assertEquals("10", p2.getResult().message().get(0).getData()); + + Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config3 = p3.getConfig(); + config3.put("enabled", true); + p3.setConfig(config3); + p3.setText("%spark println(new java.util.Date())"); + p3.setAuthenticationInfo(anonymous); + + p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId())); + note.run(p0.getId()); + waitForFinish(p0); + waitForFinish(p1); + waitForFinish(p2); + waitForFinish(p3); + + assertEquals(Status.FINISHED, p3.getStatus()); + String p3result = p3.getResult().message().get(0).getData(); + assertNotEquals(null, p3result); + assertNotEquals("", p3result); + + p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), p3.getId())); + p3.setText("%%spark println(\"END\")"); + + note.run(p0.getId()); + waitForFinish(p0); + waitForFinish(p3); + + assertNotEquals(p3result, p3.getResult().message()); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void pySparkDepLoaderTest() throws IOException, InterpreterException { + // create new note + Note note = ZeppelinServer.notebook.createNote(anonymous); + + // restart spark interpreter + List<InterpreterSetting> settings = + ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId()); + + for (InterpreterSetting setting : settings) { + if (setting.getName().equals("spark")) { + ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); + break; + } } - @Test - public void testConfInterpreter() throws IOException { - ZeppelinServer.notebook.getInterpreterSettingManager().close(); - Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setConfig(config); - p1.setText("%spark\nimport com.databricks.spark.csv._"); - p1.setAuthenticationInfo(anonymous); - note.run(p1.getId()); - - waitForFinish(p1); - assertEquals(Status.FINISHED, p1.getStatus()); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); - + // load dep + Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p0.getConfig(); + config.put("enabled", true); + p0.setConfig(config); + p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")"); + p0.setAuthenticationInfo(anonymous); + note.run(p0.getId()); + waitForFinish(p0); + assertEquals(Status.FINISHED, p0.getStatus()); + + // write test csv file + File tmpFile = File.createTempFile("test", "csv"); + FileUtils.write(tmpFile, "a,b\n1,2"); + + // load data using libraries from dep loader + Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setConfig(config); + + String sqlContextName = "sqlContext"; + if (isSpark2()) { + sqlContextName = "spark"; } + p1.setText("%pyspark\n" + + "from pyspark.sql import SQLContext\n" + + "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" + + ".load('" + tmpFile.getAbsolutePath() + "').count())"); + p1.setAuthenticationInfo(anonymous); + note.run(p1.getId()); + + waitForFinish(p1); + assertEquals(Status.FINISHED, p1.getStatus()); + assertEquals("2\n", p1.getResult().message().get(0).getData()); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + private void verifySparkVersionNumber() throws IOException { + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + note.setName("note"); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark print(sc.version)"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + assertEquals(sparkVersion, p.getResult().message().get(0).getData()); + } + + private int toIntSparkVersion(String sparkVersion) { + String[] split = sparkVersion.split("\\."); + int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); + return version; + } + + private boolean isSpark2() { + return toIntSparkVersion(sparkVersion) >= 20; + } + + @Test + public void testSparkZeppelinContextDynamicForms() throws IOException { + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + note.setName("note"); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + String code = "%spark.spark println(z.textbox(\"my_input\", \"default_name\"))\n" + + "println(z.select(\"my_select\", \"1\"," + + "Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" + + "val items=z.checkbox(\"my_checkbox\", Seq(\"2\"), " + + "Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")))\n" + + "println(items(0))"; + p.setText(code); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + + assertEquals(Status.FINISHED, p.getStatus()); + Iterator<String> formIter = p.settings.getForms().keySet().iterator(); + assert (formIter.next().equals("my_input")); + assert (formIter.next().equals("my_select")); + assert (formIter.next().equals("my_checkbox")); + + // check dynamic forms values + String[] result = p.getResult().message().get(0).getData().split("\n"); + assertEquals(4, result.length); + assertEquals("default_name", result[0]); + assertEquals("1", result[1]); + assertEquals("items: Seq[Object] = Buffer(2)", result[2]); + assertEquals("2", result[3]); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void testPySparkZeppelinContextDynamicForms() throws IOException { + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + note.setName("note"); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + String code = "%spark.pyspark print(z.input('my_input', 'default_name'))\n" + + "print(z.select('my_select', " + + "[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" + + "items=z.checkbox('my_checkbox', " + + "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" + + "print(items[0])"; + p.setText(code); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + + assertEquals(Status.FINISHED, p.getStatus()); + Iterator<String> formIter = p.settings.getForms().keySet().iterator(); + assert (formIter.next().equals("my_input")); + assert (formIter.next().equals("my_select")); + assert (formIter.next().equals("my_checkbox")); + + // check dynamic forms values + String[] result = p.getResult().message().get(0).getData().split("\n"); + assertEquals(3, result.length); + assertEquals("default_name", result[0]); + assertEquals("1", result[1]); + assertEquals("2", result[2]); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + } + + @Test + public void testConfInterpreter() throws IOException { + ZeppelinServer.notebook.getInterpreterSettingManager().close(); + Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + Map config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.FINISHED, p.getStatus()); + + Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + p1.setConfig(config); + p1.setText("%spark\nimport com.databricks.spark.csv._"); + p1.setAuthenticationInfo(anonymous); + note.run(p1.getId()); + + waitForFinish(p1); + assertEquals(Status.FINISHED, p1.getStatus()); + + ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties index 8368993..aff3b79 100644 --- a/zeppelin-server/src/test/resources/log4j.properties +++ b/zeppelin-server/src/test/resources/log4j.properties @@ -42,5 +42,4 @@ log4j.logger.DataNucleus.Datastore=ERROR # Log all JDBC parameters log4j.logger.org.hibernate.type=ALL -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.spark=DEBUG +log4j.logger.org.apache.hadoop=WARN http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 81ce716..cdb11dd 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -391,6 +391,20 @@ </systemProperties> </configuration> </plugin> + + <!-- publish test jar as well so that zeppelin-server module can use it --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>3.0.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java new file mode 100644 index 0000000..e027bb0 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java @@ -0,0 +1,91 @@ +package org.apache.zeppelin.interpreter; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; + +public class SparkDownloadUtils { + private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class); + + private static String downloadFolder = System.getProperty("user.home") + "/.cache/spark"; + + static { + try { + FileUtils.forceMkdir(new File(downloadFolder)); + } catch (IOException e) { + throw new RuntimeException("Fail to create downloadFolder: " + downloadFolder, e); + } + } + + + public static String downloadSpark(String version) { + File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6"); + if (targetSparkHomeFolder.exists()) { + LOGGER.info("Skip to download spark as it is already downloaded."); + return targetSparkHomeFolder.getAbsolutePath(); + } + // Try mirrors a few times until one succeeds + for (int i = 0; i < 3; i++) { + try { + String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz"); + String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version + "-bin-hadoop2.6.tgz";; + runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); + runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + break; + } catch (Exception e) { + LOGGER.warn("Failed to download Spark", e); + } + } + return targetSparkHomeFolder.getAbsolutePath(); + } + + private static void runShellCommand(String[] commands) throws IOException, InterruptedException { + LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " ")); + Process process = Runtime.getRuntime().exec(commands); + StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream()); + StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream()); + errorGobbler.start(); + outputGobbler.start(); + if (process.waitFor() != 0) { + throw new IOException("Fail to run shell commands: " + StringUtils.join(commands, " ")); + } + LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " ")); + } + + private static class StreamGobbler extends Thread { + InputStream is; + + // reads everything from is until empty. + StreamGobbler(InputStream is) { + this.is = is; + } + + public void run() { + try { + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line = null; + long startTime = System.currentTimeMillis(); + while ( (line = br.readLine()) != null) { + // logging per 5 seconds + if ((System.currentTimeMillis() - startTime) > 5000) { + LOGGER.info(line); + startTime = System.currentTimeMillis(); + } + } + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + } +}