Repository: zeppelin Updated Branches: refs/heads/master 55de3fba7 -> d4375977d
[ZEPPELIN-1585] Testcase for PySparkInterpreter. ### What is this PR for? This PR adds PySparkInterpreter testcase. ### What type of PR is it? Improvement ### What is the Jira issue? ### Questions: - Does the licenses files need update? no - Is there breaking changes for older versions? no - Does this needs documentation? no Author: astroshim <[email protected]> Closes #1564 from astroshim/feat/pySparkInterpreterTest and squashes the following commits: ea8a081 [astroshim] fix to use full package name. 5f33389 [astroshim] fix to use full package name. 9650077 [astroshim] apply spark.submit.pyFiles 5b39384 [astroshim] ignore testcase of spark 1.1 version 3c7abf0 [astroshim] Merge branch 'master' into feat/pySparkInterpreterTest 1cf3fae [astroshim] Merge branch 'master' into feat/pySparkInterpreterTest 265a82b [astroshim] change scope 51aa813 [astroshim] add PySparkInterpreter testcase. 3fe0c7e [astroshim] Merge branch 'master' into feat/pySparkInterpreterTest 499aa6b [astroshim] add PySparkInterpreter testcase Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d4375977 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d4375977 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d4375977 Branch: refs/heads/master Commit: d4375977ddf938f40d0f6af24c35c898f7e96985 Parents: 55de3fb Author: astroshim <[email protected]> Authored: Thu Nov 3 15:37:17 2016 +0900 Committer: Mina Lee <[email protected]> Committed: Thu Nov 3 21:09:17 2016 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 15 +- .../apache/zeppelin/spark/SparkInterpreter.java | 1 + .../zeppelin/spark/PySparkInterpreterTest.java | 147 +++++++++++++++++++ 3 files changed, 159 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d4375977/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index c0b131c..420ebd5 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -49,9 +49,9 @@ import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; @@ -165,6 +165,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } + private Map setupPySparkEnv() throws IOException{ + Map env = EnvironmentUtils.getProcEnvironment(); + if (!env.containsKey("PYTHONPATH")) { + SparkConf conf = getSparkConf(); + env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":")); + } + return env; + } + private void createGatewayServerAndStartScript() { // create python script createPythonScript(); @@ -196,10 +205,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand executor.setStreamHandler(streamHandler); executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - try { - Map env = EnvironmentUtils.getProcEnvironment(); - + Map env = setupPySparkEnv(); executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d4375977/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 6aa2f28..53bf30b 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -504,6 +504,7 @@ public class SparkInterpreter extends Interpreter { conf.set("spark.files", conf.get("spark.yarn.dist.files")); } conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs)); + conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris)); } // Distributes needed libraries to workers http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d4375977/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java new file mode 100644 index 0000000..401e0fd --- /dev/null +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterOutputListener; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.*; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class PySparkInterpreterTest { + public static SparkInterpreter sparkInterpreter; + public static PySparkInterpreter pySparkInterpreter; + public static InterpreterGroup intpGroup; + private File tmpDir; + public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class); + private InterpreterContext context; + + public static Properties getPySparkTestProperties() { + Properties p = new Properties(); + p.setProperty("master", "local[*]"); + p.setProperty("spark.app.name", "Zeppelin Test"); + p.setProperty("zeppelin.spark.useHiveContext", "true"); + p.setProperty("zeppelin.spark.maxResult", "1000"); + p.setProperty("zeppelin.spark.importImplicit", "true"); + p.setProperty("zeppelin.pyspark.python", "python"); + return p; + } + + /** + * Get spark version number as a numerical value. + * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... + */ + public static int getSparkVersionNumber() { + if (sparkInterpreter == null) { + return 0; + } + + String[] split = sparkInterpreter.getSparkContext().version().split("\\."); + int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]); + return version; + } + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + tmpDir.mkdirs(); + + intpGroup = new InterpreterGroup(); + intpGroup.put("note", new LinkedList<Interpreter>()); + + if (sparkInterpreter == null) { + sparkInterpreter = new SparkInterpreter(getPySparkTestProperties()); + intpGroup.get("note").add(sparkInterpreter); + sparkInterpreter.setInterpreterGroup(intpGroup); + sparkInterpreter.open(); + } + + if (pySparkInterpreter == null) { + pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties()); + intpGroup.get("note").add(pySparkInterpreter); + pySparkInterpreter.setInterpreterGroup(intpGroup); + pySparkInterpreter.open(); + } + + context = new InterpreterContext("note", "id", "title", "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList<InterpreterContextRunner>(), + new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + + } + })); + } + + @After + public void tearDown() throws Exception { + delete(tmpDir); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } + + @Test + public void testBasicIntp() { + if (getSparkVersionNumber() > 11) { + assertEquals(InterpreterResult.Code.SUCCESS, + pySparkInterpreter.interpret("a = 1\n", context).code()); + } + } + +}
