ZEPPELIN-2685. Improvement on Interpreter class ### What is this PR for? Follow up of #2577. Main changes on Interpreter * Add throw `InterpreterException` which is checked exception for the abstract methods of `Interpreter`, this would enforce the interpreter implementation to throw `InterpreterException`. * field name refactoring.
* `property` -> `properties` * `getProperty()` --> `getProperties()` * Introduce launcher layer for interpreter launching. Currently we only use shell script to launch interpreter, but it could be any other service or component to launch interpreter, such as livy server , other 3rd party tools or even we may create a separate module for interpreter launcher * abstract cass `InterpreterLauncher` * For now, only 2 implementation: `ShellScriptLauncher` & `SparkInterpreterLauncher`. We could add method in class `Interpreter` to allow interpreter to specify its own launcher class, but it could be future work. ### What type of PR is it? [Improvement | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2685 ### How should this be tested? Unit test is covered. `ShellScriptLauncherTest` & `SparkInterpreterLauncherTest` ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2592 from zjffdu/ZEPPELIN-2685 and squashes the following commits: 17dc2f1 [Jeff Zhang] address comments e545cc3 [Jeff Zhang] ZEPPELIN-2685. Improvement on Interpreter class Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/9812e26b Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/9812e26b Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/9812e26b Branch: refs/heads/master Commit: 9812e26bced75033b5795ee0fb8d8c08df9c83b5 Parents: ed8755d Author: Jeff Zhang <zjf...@apache.org> Authored: Fri Sep 22 15:00:00 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Sat Oct 14 13:45:51 2017 +0800 ---------------------------------------------------------------------- bin/common.sh | 2 +- bin/interpreter.sh | 1 + docs/interpreter/spark.md | 6 + .../elasticsearch/ElasticsearchInterpreter.java | 8 +- .../apache/zeppelin/file/FileInterpreter.java | 3 +- .../zeppelin/file/HDFSFileInterpreter.java | 2 +- .../apache/zeppelin/flink/FlinkInterpreter.java | 8 +- .../org/apache/zeppelin/groovy/GObject.java | 6 +- .../zeppelin/groovy/GroovyInterpreter.java | 7 +- .../apache/zeppelin/hbase/HbaseInterpreter.java | 4 +- .../zeppelin/hbase/HbaseInterpreterTest.java | 3 +- .../apache/zeppelin/helium/DevInterpreter.java | 3 +- .../zeppelin/ignite/IgniteSqlInterpreter.java | 2 +- .../ignite/IgniteSqlInterpreterTest.java | 3 +- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 25 +- .../zeppelin/jdbc/JDBCInterpreterTest.java | 3 +- .../zeppelin/livy/BaseLivyInterpreter.java | 12 +- .../zeppelin/livy/LivySparkSQLInterpreter.java | 4 +- .../apache/zeppelin/livy/LivyInterpreterIT.java | 23 +- .../org/apache/zeppelin/pig/PigInterpreter.java | 3 +- .../zeppelin/pig/PigQueryInterpreter.java | 4 +- .../zeppelin/pig/PigQueryInterpreterTest.java | 3 +- .../zeppelin/python/IPythonInterpreter.java | 33 +- .../zeppelin/python/PythonCondaInterpreter.java | 27 +- .../python/PythonDockerInterpreter.java | 25 +- .../zeppelin/python/PythonInterpreter.java | 23 +- .../python/PythonInterpreterPandasSql.java | 10 +- .../zeppelin/python/IPythonInterpreterTest.java | 13 +- .../python/PythonCondaInterpreterTest.java | 8 +- .../python/PythonDockerInterpreterTest.java | 6 +- .../python/PythonInterpreterMatplotlibTest.java | 9 +- .../python/PythonInterpreterPandasSqlTest.java | 13 +- .../zeppelin/python/PythonInterpreterTest.java | 9 +- .../zeppelin/scalding/ScaldingInterpreter.java | 8 +- .../apache/zeppelin/shell/ShellInterpreter.java | 4 +- .../zeppelin/spark/IPySparkInterpreter.java | 21 +- .../zeppelin/spark/PySparkInterpreter.java | 45 +- .../apache/zeppelin/spark/SparkInterpreter.java | 20 +- .../zeppelin/spark/SparkRInterpreter.java | 7 +- .../zeppelin/spark/SparkSqlInterpreter.java | 9 +- .../zeppelin/spark/SparkZeppelinContext.java | 8 +- .../org/apache/zeppelin/spark/ZeppelinR.java | 16 +- .../zeppelin/spark/IPySparkInterpreterTest.java | 5 +- .../spark/PySparkInterpreterMatplotlibTest.java | 12 +- .../zeppelin/spark/PySparkInterpreterTest.java | 15 +- .../zeppelin/spark/SparkInterpreterTest.java | 10 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 8 +- zeppelin-interpreter/pom.xml | 5 + .../zeppelin/conf/ZeppelinConfiguration.java | 847 +++++++++++++++++++ .../interpreter/BaseZeppelinContext.java | 14 +- .../interpreter/ClassloaderInterpreter.java | 45 +- .../zeppelin/interpreter/Interpreter.java | 51 +- .../interpreter/InterpreterException.java | 3 +- .../interpreter/LazyOpenInterpreter.java | 26 +- .../interpreter/launcher/InterpreterClient.java | 26 + .../launcher/InterpreterLaunchContext.java | 68 ++ .../launcher/InterpreterLauncher.java | 38 + .../remote/RemoteInterpreterContextRunner.java | 2 +- .../remote/RemoteInterpreterServer.java | 43 +- .../zeppelin/interpreter/InterpreterTest.java | 10 +- .../interpreter/LazyOpenInterpreterTest.java | 2 +- .../zeppelin/rest/InterpreterRestApi.java | 2 +- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 5 +- zeppelin-zengine/pom.xml | 5 - .../zeppelin/conf/ZeppelinConfiguration.java | 847 ------------------- .../interpreter/InterpreterFactory.java | 2 +- .../interpreter/InterpreterSetting.java | 183 +--- .../interpreter/InterpreterSettingManager.java | 11 +- .../interpreter/ManagedInterpreterGroup.java | 9 +- .../launcher/ShellScriptLauncher.java | 82 ++ .../launcher/SparkInterpreterLauncher.java | 205 +++++ .../remote/InterpreterContextRunnerPool.java | 2 +- .../interpreter/remote/RemoteInterpreter.java | 82 +- .../remote/RemoteInterpreterManagedProcess.java | 56 +- .../remote/RemoteInterpreterProcess.java | 25 +- .../remote/RemoteInterpreterRunningProcess.java | 4 +- .../java/org/apache/zeppelin/notebook/Note.java | 2 +- .../org/apache/zeppelin/notebook/Notebook.java | 6 +- .../org/apache/zeppelin/notebook/Paragraph.java | 40 +- .../helium/HeliumApplicationFactoryTest.java | 3 +- .../zeppelin/interpreter/EchoInterpreter.java | 2 +- .../interpreter/InterpreterFactoryTest.java | 4 +- .../InterpreterSettingManagerTest.java | 2 +- .../zeppelin/interpreter/SleepInterpreter.java | 2 +- .../interpreter/SparkInterpreterModeTest.java | 8 +- .../launcher/ShellScriptLauncherTest.java | 54 ++ .../launcher/SparkInterpreterLauncherTest.java | 160 ++++ .../remote/RemoteAngularObjectTest.java | 6 +- .../RemoteInterpreterOutputTestStream.java | 8 +- .../remote/RemoteInterpreterTest.java | 46 +- .../remote/mock/MockInterpreterA.java | 5 +- .../mock/MockInterpreterOutputStream.java | 3 +- .../apache/zeppelin/notebook/NotebookTest.java | 17 +- .../resource/DistributedResourcePoolTest.java | 6 +- .../zeppelin/scheduler/RemoteSchedulerTest.java | 13 +- 95 files changed, 2134 insertions(+), 1452 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/bin/common.sh ---------------------------------------------------------------------- diff --git a/bin/common.sh b/bin/common.sh index d425cb1..6447ec8 100644 --- a/bin/common.sh +++ b/bin/common.sh @@ -122,7 +122,7 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" export JAVA_OPTS JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}" -if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then +if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties" else JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties" http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 5245e25..d27b076 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -148,6 +148,7 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}" + export HADOOP_CONF_DIR=${HADOOP_CONF_DIR} else # autodetect HADOOP_CONF_HOME by heuristic if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/docs/interpreter/spark.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md index be5b3e5..bbd9065 100644 --- a/docs/interpreter/spark.md +++ b/docs/interpreter/spark.md @@ -181,6 +181,7 @@ For example, * **local[*]** in local mode * **spark://master:7077** in standalone cluster * **yarn-client** in Yarn client mode + * **yarn-cluster** in Yarn cluster mode * **mesos://host:5050** in Mesos cluster That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way. @@ -188,6 +189,11 @@ For the further information about Spark & Zeppelin version compatibility, please > Note that without exporting `SPARK_HOME`, it's running in local mode with > included version of Spark. The included version may vary depending on the > build profile. +### 3. Yarn mode +Zeppelin support both yarn client and yarn cluster mode (yarn cluster mode is supported from 0.8.0). For yarn mode, you must specify `SPARK_HOME` & `HADOOP_CONF_DIR`. +You can either specify them in `zeppelin-env.sh`, or in interpreter setting page. Specifying them in `zeppelin-env.sh` means you can use only one version of `spark` & `hadoop`. Specifying them +in interpreter setting page means you can use multiple versions of `spark` & `hadoop` in one zeppelin instance. + ## SparkContext, SQLContext, SparkSession, ZeppelinContext SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments. Staring from 0.6.1 SparkSession is available as variable `spark` when you are using Spark 2.x. http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java index 33448df..6251b92 100644 --- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java @@ -112,7 +112,7 @@ public class ElasticsearchInterpreter extends Interpreter { @Override public void open() { - logger.info("Properties: {}", getProperty()); + logger.info("Properties: {}", getProperties()); String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE); clientType = clientType == null ? null : clientType.toLowerCase(); @@ -123,15 +123,15 @@ public class ElasticsearchInterpreter extends Interpreter { catch (final NumberFormatException e) { this.resultSize = 10; logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " + - property.get(ELASTICSEARCH_RESULT_SIZE), e); + getProperty(ELASTICSEARCH_RESULT_SIZE), e); } try { if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) { - elsClient = new TransportBasedClient(getProperty()); + elsClient = new TransportBasedClient(getProperties()); } else if ("http".equals(clientType)) { - elsClient = new HttpBasedClient(getProperty()); + elsClient = new HttpBasedClient(getProperties()); } else { logger.error("Unknown type of Elasticsearch client: " + clientType); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java ---------------------------------------------------------------------- diff --git a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java index d7aad19..cf83672 100644 --- a/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java @@ -20,6 +20,7 @@ package org.apache.zeppelin.file; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; @@ -86,7 +87,7 @@ public abstract class FileInterpreter extends Interpreter { // Functions that each file system implementation must override - public abstract String listAll(String path); + public abstract String listAll(String path) throws InterpreterException; public abstract boolean isDirectory(String path); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java ---------------------------------------------------------------------- diff --git a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java index 244101c..d715ed9 100644 --- a/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java +++ b/file/src/main/java/org/apache/zeppelin/file/HDFSFileInterpreter.java @@ -202,7 +202,7 @@ public class HDFSFileInterpreter extends FileInterpreter { return "No such File or directory"; } - public String listAll(String path) { + public String listAll(String path) throws InterpreterException { String all = ""; if (exceptionOnConnect != null) return "Error connecting to provided endpoint."; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java ---------------------------------------------------------------------- diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 710eace..19c77de 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -17,7 +17,6 @@ */ package org.apache.zeppelin.flink; -import java.lang.reflect.InvocationTargetException; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; @@ -34,10 +33,8 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterUtils; @@ -46,11 +43,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Console; -import scala.None; -import scala.Option; import scala.Some; import scala.collection.JavaConversions; -import scala.collection.immutable.Nil; import scala.concurrent.duration.FiniteDuration; import scala.runtime.AbstractFunction0; import scala.tools.nsc.Settings; @@ -80,7 +74,7 @@ public class FlinkInterpreter extends Interpreter { public void open() { out = new ByteArrayOutputStream(); flinkConf = new org.apache.flink.configuration.Configuration(); - Properties intpProperty = getProperty(); + Properties intpProperty = getProperties(); for (Object k : intpProperty.keySet()) { String key = (String) k; String val = toString(intpProperty.get(key)); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/groovy/src/main/java/org/apache/zeppelin/groovy/GObject.java ---------------------------------------------------------------------- diff --git a/groovy/src/main/java/org/apache/zeppelin/groovy/GObject.java b/groovy/src/main/java/org/apache/zeppelin/groovy/GObject.java index ce7d00f..babda8f 100644 --- a/groovy/src/main/java/org/apache/zeppelin/groovy/GObject.java +++ b/groovy/src/main/java/org/apache/zeppelin/groovy/GObject.java @@ -316,12 +316,12 @@ public class GObject extends groovy.lang.GroovyObjectSupport { @ZeppelinApi public void run(String noteId, String paragraphId, InterpreterContext context) { if (paragraphId.equals(context.getParagraphId())) { - throw new InterpreterException("Can not run current Paragraph"); + throw new RuntimeException("Can not run current Paragraph"); } List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, paragraphId, context); if (runners.size() <= 0) { - throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size()); + throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size()); } for (InterpreterContextRunner r : runners) { r.run(); @@ -338,7 +338,7 @@ public class GObject extends groovy.lang.GroovyObjectSupport { List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context); if (runners.size() <= 0) { - throw new InterpreterException("Note " + noteId + " not found " + runners.size()); + throw new RuntimeException("Note " + noteId + " not found " + runners.size()); } for (InterpreterContextRunner r : runners) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java ---------------------------------------------------------------------- diff --git a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java index 8c83ef7..01e97e6 100644 --- a/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java +++ b/groovy/src/main/java/org/apache/zeppelin/groovy/GroovyInterpreter.java @@ -17,8 +17,6 @@ package org.apache.zeppelin.groovy; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.StringWriter; import java.io.PrintWriter; import java.io.File; @@ -26,10 +24,8 @@ import java.util.*; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Scheduler; @@ -40,7 +36,6 @@ import org.slf4j.LoggerFactory; import groovy.lang.GroovyShell; import groovy.lang.Script; import org.codehaus.groovy.control.CompilerConfiguration; -import org.codehaus.groovy.runtime.ResourceGroovyMethods; import org.codehaus.groovy.runtime.StackTraceUtils; import java.util.concurrent.ConcurrentHashMap; @@ -167,7 +162,7 @@ public class GroovyInterpreter extends Interpreter { //put shared bindings evaluated in this interpreter bindings.putAll(sharedBindings); //put predefined bindings - bindings.put("g", new GObject(log, out, this.getProperty(), contextInterpreter, bindings)); + bindings.put("g", new GObject(log, out, this.getProperties(), contextInterpreter, bindings)); bindings.put("out", new PrintWriter(out, true)); script.run(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java b/hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java index 74d3ed1..63c1928 100644 --- a/hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java +++ b/hbase/src/main/java/org/apache/zeppelin/hbase/HbaseInterpreter.java @@ -68,7 +68,7 @@ public class HbaseInterpreter extends Interpreter { } @Override - public void open() { + public void open() throws InterpreterException { this.scriptingContainer = new ScriptingContainer(LocalContextScope.SINGLETON); this.writer = new StringWriter(); scriptingContainer.setOutput(this.writer); @@ -88,7 +88,7 @@ public class HbaseInterpreter extends Interpreter { } logger.info("Absolute Ruby Source:" + abs_ruby_src.toString()); - // hirb.rb:41 requires the following system property to be set. + // hirb.rb:41 requires the following system properties to be set. Properties sysProps = System.getProperties(); sysProps.setProperty(HBASE_RUBY_SRC, abs_ruby_src.toString()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/hbase/src/test/java/org/apache/zeppelin/hbase/HbaseInterpreterTest.java ---------------------------------------------------------------------- diff --git a/hbase/src/test/java/org/apache/zeppelin/hbase/HbaseInterpreterTest.java b/hbase/src/test/java/org/apache/zeppelin/hbase/HbaseInterpreterTest.java index 38a8b4d..53040f9 100644 --- a/hbase/src/test/java/org/apache/zeppelin/hbase/HbaseInterpreterTest.java +++ b/hbase/src/test/java/org/apache/zeppelin/hbase/HbaseInterpreterTest.java @@ -15,6 +15,7 @@ package org.apache.zeppelin.hbase; import org.apache.log4j.BasicConfigurator; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.junit.BeforeClass; import org.junit.Test; @@ -35,7 +36,7 @@ public class HbaseInterpreterTest { private static HbaseInterpreter hbaseInterpreter; @BeforeClass - public static void setUp() throws NullPointerException { + public static void setUp() throws NullPointerException, InterpreterException { BasicConfigurator.configure(); Properties properties = new Properties(); properties.put("hbase.home", ""); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevInterpreter.java ---------------------------------------------------------------------- diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevInterpreter.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevInterpreter.java index 7d1c361..ba1a564 100644 --- a/helium-dev/src/main/java/org/apache/zeppelin/helium/DevInterpreter.java +++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/DevInterpreter.java @@ -74,7 +74,8 @@ public class DevInterpreter extends Interpreter { } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { this.context = context; try { return interpreterEvent.interpret(st, context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java index 41803bb..6af8eb5 100644 --- a/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java +++ b/ignite/src/main/java/org/apache/zeppelin/ignite/IgniteSqlInterpreter.java @@ -93,7 +93,7 @@ public class IgniteSqlInterpreter extends Interpreter { } @Override - public void close() { + public void close() throws InterpreterException { try { if (conn != null) { conn.close(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java index b06d457..a6ae0ea 100644 --- a/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java +++ b/ignite/src/test/java/org/apache/zeppelin/ignite/IgniteSqlInterpreterTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; @@ -82,7 +83,7 @@ public class IgniteSqlInterpreterTest { } @After - public void tearDown() { + public void tearDown() throws InterpreterException { intp.close(); ignite.close(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java ---------------------------------------------------------------------- diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2c5258c..f3dfddc 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -45,8 +45,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; -import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -172,7 +170,7 @@ public class JDBCInterpreter extends KerberosInterpreter { @Override public void open() { super.open(); - for (String propertyKey : property.stringPropertyNames()) { + for (String propertyKey : properties.stringPropertyNames()) { logger.debug("propertyKey: {}", propertyKey); String[] keyValue = propertyKey.split("\\.", 2); if (2 == keyValue.length) { @@ -185,7 +183,7 @@ public class JDBCInterpreter extends KerberosInterpreter { prefixProperties = new Properties(); basePropretiesMap.put(keyValue[0].trim(), prefixProperties); } - prefixProperties.put(keyValue[1].trim(), property.getProperty(propertyKey)); + prefixProperties.put(keyValue[1].trim(), getProperty(propertyKey)); } } @@ -211,8 +209,8 @@ public class JDBCInterpreter extends KerberosInterpreter { protected boolean isKerboseEnabled() { - if (!isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) { - UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property); + if (!isEmpty(getProperty("zeppelin.jdbc.auth.type"))) { + UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(properties); if (authType.equals(KERBEROS)) { return true; } @@ -356,7 +354,7 @@ public class JDBCInterpreter extends KerberosInterpreter { } private void setUserProperty(String propertyKey, InterpreterContext interpreterContext) - throws SQLException, IOException { + throws SQLException, IOException, InterpreterException { String user = interpreterContext.getAuthenticationInfo().getUser(); @@ -424,18 +422,19 @@ public class JDBCInterpreter extends KerberosInterpreter { final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey); final String url = properties.getProperty(URL_KEY); - if (isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) { + if (isEmpty(getProperty("zeppelin.jdbc.auth.type"))) { connection = getConnectionFromPool(url, user, propertyKey, properties); } else { - UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property); + UserGroupInformation.AuthenticationMethod authType = + JDBCSecurityImpl.getAuthtype(getProperties()); final String connectionUrl = appendProxyUserToURL(url, user, propertyKey); - JDBCSecurityImpl.createSecureConfiguration(property, authType); + JDBCSecurityImpl.createSecureConfiguration(getProperties(), authType); switch (authType) { case KERBEROS: if (user == null || "false".equalsIgnoreCase( - property.getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) { + getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) { connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties); } else { if (basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) { @@ -497,7 +496,7 @@ public class JDBCInterpreter extends KerberosInterpreter { return connectionUrl.toString(); } - private String getPassword(Properties properties) throws IOException { + private String getPassword(Properties properties) throws IOException, InterpreterException { if (isNotEmpty(properties.getProperty(PASSWORD_KEY))) { return properties.getProperty(PASSWORD_KEY); } else if (isNotEmpty(properties.getProperty(JDBC_JCEKS_FILE)) @@ -850,7 +849,7 @@ public class JDBCInterpreter extends KerberosInterpreter { @Override public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) throws InterpreterException { List<InterpreterCompletion> candidates = new ArrayList<>(); String propertyKey = getPropertyKey(buf); String sqlCompleterKey = http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java ---------------------------------------------------------------------- diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index e6f9598..1b3f045 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -37,6 +37,7 @@ import java.util.Properties; import org.apache.zeppelin.completer.CompletionType; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.FIFOScheduler; @@ -349,7 +350,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testAutoCompletion() throws SQLException, IOException { + public void testAutoCompletion() throws SQLException, IOException, InterpreterException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java index b725348..03a269e 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java @@ -131,7 +131,7 @@ public abstract class BaseLivyInterpreter extends Interpreter { public abstract String getSessionKind(); @Override - public void open() { + public void open() throws InterpreterException { try { initLivySession(); } catch (LivyException e) { @@ -228,7 +228,7 @@ public abstract class BaseLivyInterpreter extends Interpreter { throws LivyException { try { Map<String, String> conf = new HashMap<>(); - for (Map.Entry<Object, Object> entry : property.entrySet()) { + for (Map.Entry<Object, Object> entry : getProperties().entrySet()) { if (entry.getKey().toString().startsWith("livy.spark.") && !entry.getValue().toString().isEmpty()) conf.put(entry.getKey().toString().substring(5), entry.getValue().toString()); @@ -458,15 +458,15 @@ public abstract class BaseLivyInterpreter extends Interpreter { private RestTemplate createRestTemplate() { - String keytabLocation = property.getProperty("zeppelin.livy.keytab"); - String principal = property.getProperty("zeppelin.livy.principal"); + String keytabLocation = getProperty("zeppelin.livy.keytab"); + String principal = getProperty("zeppelin.livy.principal"); boolean isSpnegoEnabled = StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal); HttpClient httpClient = null; if (livyURL.startsWith("https:")) { - String keystoreFile = property.getProperty("zeppelin.livy.ssl.trustStore"); - String password = property.getProperty("zeppelin.livy.ssl.trustStorePassword"); + String keystoreFile = getProperty("zeppelin.livy.ssl.trustStore"); + String password = getProperty("zeppelin.livy.ssl.trustStorePassword"); if (StringUtils.isBlank(keystoreFile)) { throw new RuntimeException("No zeppelin.livy.ssl.trustStore specified for livy ssl"); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 20d0448..486e31c 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { } @Override - public void open() { + public void open() throws InterpreterException { this.sparkInterpreter = getSparkInterpreter(); // As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession // to judge whether it is using spark2. @@ -93,7 +93,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter { } } - private LivySparkInterpreter getSparkInterpreter() { + private LivySparkInterpreter getSparkInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; LivySparkInterpreter spark = null; Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index 9a0aef4..d413134 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -75,8 +75,9 @@ public class LivyInterpreterIT { return true; } + // @Test - public void testSparkInterpreterRDD() { + public void testSparkInterpreterRDD() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -195,8 +196,9 @@ public class LivyInterpreterIT { } } + // @Test - public void testSparkInterpreterDataFrame() { + public void testSparkInterpreterDataFrame() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -284,7 +286,7 @@ public class LivyInterpreterIT { } // @Test - public void testSparkSQLInterpreter() { + public void testSparkSQLInterpreter() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -319,7 +321,7 @@ public class LivyInterpreterIT { // @Test - public void testSparkSQLCancellation() { + public void testSparkSQLCancellation() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -400,7 +402,7 @@ public class LivyInterpreterIT { } // @Test - public void testStringWithTruncation() { + public void testStringWithTruncation() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -459,8 +461,9 @@ public class LivyInterpreterIT { } } + // @Test - public void testStringWithoutTruncation() { + public void testStringWithoutTruncation() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -525,7 +528,7 @@ public class LivyInterpreterIT { } @Test - public void testPySparkInterpreter() throws LivyException { + public void testPySparkInterpreter() throws LivyException, InterpreterException { if (!checkPreCondition()) { return; } @@ -645,7 +648,7 @@ public class LivyInterpreterIT { } // @Test - public void testSparkInterpreterWithDisplayAppInfo() { + public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException { if (!checkPreCondition()) { return; } @@ -684,7 +687,7 @@ public class LivyInterpreterIT { } // @Test - public void testSparkRInterpreter() throws LivyException { + public void testSparkRInterpreter() throws LivyException, InterpreterException { if (!checkPreCondition()) { return; } @@ -756,7 +759,7 @@ public class LivyInterpreterIT { } // @Test - public void testLivyTutorialNote() throws IOException { + public void testLivyTutorialNote() throws IOException, InterpreterException { if (!checkPreCondition()) { return; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 8937416..0f2d59b 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.pig; import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.pig.PigServer; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -60,7 +59,7 @@ public class PigInterpreter extends BasePigInterpreter { } try { pigServer = new PigServer(execType); - for (Map.Entry entry : getProperty().entrySet()) { + for (Map.Entry entry : getProperties().entrySet()) { if (!entry.getKey().toString().startsWith("zeppelin.")) { pigServer.getPigContext().getProperties().setProperty(entry.getKey().toString(), entry.getValue().toString()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java index d3bc432..da3d50e 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -55,7 +55,7 @@ public class PigQueryInterpreter extends BasePigInterpreter { } @Override - public void open() { + public void open() throws InterpreterException { pigServer = getPigInterpreter().getPigServer(); maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); } @@ -159,7 +159,7 @@ public class PigQueryInterpreter extends BasePigInterpreter { return this.pigServer; } - private PigInterpreter getPigInterpreter() { + private PigInterpreter getPigInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; PigInterpreter pig = null; Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java ---------------------------------------------------------------------- diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java index de297c7..f14cad6 100644 --- a/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigQueryInterpreterTest.java @@ -21,6 +21,7 @@ package org.apache.zeppelin.pig; import org.apache.commons.io.IOUtils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.junit.After; @@ -48,7 +49,7 @@ public class PigQueryInterpreterTest { private InterpreterContext context; @Before - public void setUp() { + public void setUp() throws InterpreterException { Properties properties = new Properties(); properties.put("zeppelin.pig.execType", "local"); properties.put("zeppelin.pig.maxResult", "20"); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java index 193c343..1c2ced5 100644 --- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java @@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -112,19 +113,20 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand } @Override - public void open() { + public void open() throws InterpreterException { try { if (ipythonClient != null) { // IPythonInterpreter might already been opened by PythonInterpreter return; } - pythonExecutable = getProperty().getProperty("zeppelin.python", "python"); + pythonExecutable = getProperty("zeppelin.python", "python"); LOGGER.info("Python Exec: " + pythonExecutable); + ipythonLaunchTimeout = Long.parseLong( - getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000")); + getProperty("zeppelin.ipython.launch.timeout", "30000")); this.zeppelinContext = new PythonZeppelinContext( getInterpreterGroup().getInterpreterHookRegistry(), - Integer.parseInt(getProperty().getProperty("zeppelin.python.maxResult", "1000"))); + Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000"))); int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); LOGGER.info("Launching IPython Kernel at port: " + ipythonPort); @@ -243,16 +245,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand } } - Map<String, String> envs = EnvironmentUtils.getProcEnvironment(); - if (envs.containsKey("PYTHONPATH")) { - if (additionalPythonPath != null) { - envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH")); - } - } else { - envs.put("PYTHONPATH", additionalPythonPath); - } - - LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH")); + Map<String, String> envs = setupIPythonEnv(); executor.execute(cmd, envs, this); // wait until IPython kernel is started or timeout @@ -284,6 +277,18 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand } } + protected Map<String, String> setupIPythonEnv() throws IOException { + Map<String, String> envs = EnvironmentUtils.getProcEnvironment(); + if (envs.containsKey("PYTHONPATH")) { + if (additionalPythonPath != null) { + envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH")); + } + } else { + envs.put("PYTHONPATH", additionalPythonPath); + } + return envs; + } + @Override public void close() { if (watchDog != null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java index 0d122f5..887beb8 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonCondaInterpreter.java @@ -67,7 +67,8 @@ public class PythonCondaInterpreter extends Interpreter { } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { InterpreterOutput out = context.out; Matcher activateMatcher = PATTERN_COMMAND_ACTIVATE.matcher(st); Matcher createMatcher = PATTERN_COMMAND_CREATE.matcher(st); @@ -126,7 +127,7 @@ public class PythonCondaInterpreter extends Interpreter { } private void changePythonEnvironment(String envName) - throws IOException, InterruptedException { + throws IOException, InterruptedException, InterpreterException { PythonInterpreter python = getPythonInterpreter(); String binPath = null; if (envName == null) { @@ -147,13 +148,13 @@ public class PythonCondaInterpreter extends Interpreter { python.setPythonCommand(binPath); } - private void restartPythonProcess() { + private void restartPythonProcess() throws InterpreterException { PythonInterpreter python = getPythonInterpreter(); python.close(); python.open(); } - protected PythonInterpreter getPythonInterpreter() { + protected PythonInterpreter getPythonInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; PythonInterpreter python = null; Interpreter p = @@ -213,7 +214,7 @@ public class PythonCondaInterpreter extends Interpreter { } private InterpreterResult runCondaActivate(String envName) - throws IOException, InterruptedException { + throws IOException, InterruptedException, InterpreterException { if (null == envName || envName.isEmpty()) { return new InterpreterResult(Code.ERROR, "Env name should be specified"); @@ -226,7 +227,7 @@ public class PythonCondaInterpreter extends Interpreter { } private InterpreterResult runCondaDeactivate() - throws IOException, InterruptedException { + throws IOException, InterruptedException, InterpreterException { changePythonEnvironment(null); restartPythonProcess(); @@ -375,10 +376,16 @@ public class PythonCondaInterpreter extends Interpreter { */ @Override public Scheduler getScheduler() { - PythonInterpreter pythonInterpreter = getPythonInterpreter(); - if (pythonInterpreter != null) { - return pythonInterpreter.getScheduler(); - } else { + PythonInterpreter pythonInterpreter = null; + try { + pythonInterpreter = getPythonInterpreter(); + if (pythonInterpreter != null) { + return pythonInterpreter.getScheduler(); + } else { + return null; + } + } catch (InterpreterException e) { + e.printStackTrace(); return null; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java index cb0f620..22f6c2e 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java @@ -56,7 +56,8 @@ public class PythonDockerInterpreter extends Interpreter { } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { File pythonScript = new File(getPythonInterpreter().getScriptPath()); InterpreterOutput out = context.out; @@ -105,7 +106,7 @@ public class PythonDockerInterpreter extends Interpreter { } - public void setPythonCommand(String cmd) { + public void setPythonCommand(String cmd) throws InterpreterException { PythonInterpreter python = getPythonInterpreter(); python.setPythonCommand(cmd); } @@ -140,21 +141,27 @@ public class PythonDockerInterpreter extends Interpreter { */ @Override public Scheduler getScheduler() { - PythonInterpreter pythonInterpreter = getPythonInterpreter(); - if (pythonInterpreter != null) { - return pythonInterpreter.getScheduler(); - } else { + PythonInterpreter pythonInterpreter = null; + try { + pythonInterpreter = getPythonInterpreter(); + if (pythonInterpreter != null) { + return pythonInterpreter.getScheduler(); + } else { + return null; + } + } catch (InterpreterException e) { + e.printStackTrace(); return null; } } - private void restartPythonProcess() { + private void restartPythonProcess() throws InterpreterException { PythonInterpreter python = getPythonInterpreter(); python.close(); python.open(); } - protected PythonInterpreter getPythonInterpreter() { + protected PythonInterpreter getPythonInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; PythonInterpreter python = null; Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName()); @@ -173,7 +180,7 @@ public class PythonDockerInterpreter extends Interpreter { return python; } - public boolean pull(InterpreterOutput out, String image) { + public boolean pull(InterpreterOutput out, String image) throws InterpreterException { int exit = 0; try { exit = runCommand(out, "docker", "pull", image); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index 50f6a8b..4f897c8 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -57,7 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import py4j.GatewayServer; -import py4j.commands.Command; /** * Python interpreter for Zeppelin. @@ -101,7 +100,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp")); scriptPath = scriptFile.getAbsolutePath(); } catch (IOException e) { - throw new InterpreterException(e); + throw new RuntimeException(e); } } @@ -116,7 +115,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl return path; } - private void createPythonScript() { + private void createPythonScript() throws InterpreterException { File out = new File(scriptPath); if (out.exists() && out.isDirectory()) { @@ -131,7 +130,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl return scriptPath; } - private void copyFile(File out, String sourceFile) { + private void copyFile(File out, String sourceFile) throws InterpreterException { ClassLoader classLoader = getClass().getClassLoader(); try { FileOutputStream outStream = new FileOutputStream(out); @@ -144,7 +143,8 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl } } - private void createGatewayServerAndStartScript() throws UnknownHostException { + private void createGatewayServerAndStartScript() + throws UnknownHostException, InterpreterException { createPythonScript(); if (System.getenv("ZEPPELIN_HOME") != null) { py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH; @@ -219,11 +219,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl } @Override - public void open() { + public void open() throws InterpreterException { // try IPythonInterpreter first. If it is not available, we will fallback to the original // python interpreter implementation. iPythonInterpreter = getIPythonInterpreter(); - if (getProperty().getProperty("zeppelin.python.useIPython", "true").equals("true") && + if (getProperty("zeppelin.python.useIPython", "true").equals("true") && iPythonInterpreter.checkIPythonPrerequisite()) { try { iPythonInterpreter.open(); @@ -369,7 +369,8 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl } @Override - public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) + throws InterpreterException { if (iPythonInterpreter != null) { return iPythonInterpreter.interpret(cmd, contextInterpreter); } @@ -551,7 +552,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl bootstrapCode += line + "\n"; } - interpret(bootstrapCode, context); + try { + interpret(bootstrapCode, context); + } catch (InterpreterException e) { + throw new IOException(e); + } } public GUI getGui() { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java index e73d7b3..54984c3 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java @@ -22,6 +22,7 @@ import java.util.Properties; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; @@ -42,7 +43,7 @@ public class PythonInterpreterPandasSql extends Interpreter { super(property); } - PythonInterpreter getPythonInterpreter() { + PythonInterpreter getPythonInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; PythonInterpreter python = null; Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName()); @@ -62,7 +63,7 @@ public class PythonInterpreterPandasSql extends Interpreter { } @Override - public void open() { + public void open() throws InterpreterException { LOG.info("Open Python SQL interpreter instance: {}", this.toString()); try { @@ -76,14 +77,15 @@ public class PythonInterpreterPandasSql extends Interpreter { } @Override - public void close() { + public void close() throws InterpreterException { LOG.info("Close Python SQL interpreter instance: {}", this.toString()); Interpreter python = getPythonInterpreter(); python.close(); } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { LOG.info("Running SQL query: '{}' over Pandas DataFrame", st); Interpreter python = getPythonInterpreter(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 137d622..6a388c2 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.display.ui.Select; import org.apache.zeppelin.display.ui.TextBox; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; @@ -56,7 +57,7 @@ public class IPythonInterpreterTest { private IPythonInterpreter interpreter; @Before - public void setUp() { + public void setUp() throws InterpreterException { Properties properties = new Properties(); interpreter = new IPythonInterpreter(properties); InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class); @@ -71,11 +72,11 @@ public class IPythonInterpreterTest { @Test - public void testIPython() throws IOException, InterruptedException { + public void testIPython() throws IOException, InterruptedException, InterpreterException { testInterpreter(interpreter); } - public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException { + public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException, InterpreterException { // to make this test can run under both python2 and python3 InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -261,7 +262,11 @@ public class IPythonInterpreterTest { } catch (InterruptedException e) { e.printStackTrace(); } - interpreter.cancel(context2); + try { + interpreter.cancel(context2); + } catch (InterpreterException e) { + e.printStackTrace(); + } } }.start(); result = interpreter.interpret("import time\ntime.sleep(10)", context2); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java index 8976396..e6f5fca 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java @@ -37,7 +37,7 @@ public class PythonCondaInterpreterTest { private PythonInterpreter python; @Before - public void setUp() { + public void setUp() throws InterpreterException { conda = spy(new PythonCondaInterpreter(new Properties())); python = mock(PythonInterpreter.class); @@ -57,7 +57,7 @@ public class PythonCondaInterpreterTest { } @Test - public void testListEnv() throws IOException, InterruptedException { + public void testListEnv() throws IOException, InterruptedException, InterpreterException { setMockCondaEnvList(); // list available env @@ -72,7 +72,7 @@ public class PythonCondaInterpreterTest { } @Test - public void testActivateEnv() throws IOException, InterruptedException { + public void testActivateEnv() throws IOException, InterruptedException, InterpreterException { setMockCondaEnvList(); String envname = "env1"; InterpreterContext context = getInterpreterContext(); @@ -84,7 +84,7 @@ public class PythonCondaInterpreterTest { } @Test - public void testDeactivate() { + public void testDeactivate() throws InterpreterException { InterpreterContext context = getInterpreterContext(); conda.interpret("deactivate", context); verify(python, times(1)).open(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java index 566b5e0..e590394 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java @@ -41,7 +41,7 @@ public class PythonDockerInterpreterTest { private PythonInterpreter python; @Before - public void setUp() { + public void setUp() throws InterpreterException { docker = spy(new PythonDockerInterpreter(new Properties())); python = mock(PythonInterpreter.class); @@ -58,7 +58,7 @@ public class PythonDockerInterpreterTest { } @Test - public void testActivateEnv() { + public void testActivateEnv() throws InterpreterException { InterpreterContext context = getInterpreterContext(); docker.interpret("activate env", context); verify(python, times(1)).open(); @@ -68,7 +68,7 @@ public class PythonDockerInterpreterTest { } @Test - public void testDeactivate() { + public void testDeactivate() throws InterpreterException { InterpreterContext context = getInterpreterContext(); docker.interpret("deactivate", context); verify(python, times(1)).open(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java index d649e89..877e428 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java @@ -22,6 +22,7 @@ import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; @@ -83,7 +84,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene } @Test - public void dependenciesAreInstalled() { + public void dependenciesAreInstalled() throws InterpreterException { // matplotlib InterpreterResult ret = python.interpret("import matplotlib", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); @@ -94,7 +95,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene } @Test - public void showPlot() throws IOException { + public void showPlot() throws IOException, InterpreterException { // Simple plot test InterpreterResult ret; ret = python.interpret("import matplotlib.pyplot as plt", context); @@ -111,7 +112,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene @Test // Test for when configuration is set to auto-close figures after show(). - public void testClose() throws IOException { + public void testClose() throws IOException, InterpreterException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -145,7 +146,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene @Test // Test for when configuration is set to not auto-close figures after show(). - public void testNoClose() throws IOException { + public void testNoClose() throws IOException, InterpreterException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java index 9e918c0..5d667bd 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java @@ -33,6 +33,7 @@ import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; @@ -107,18 +108,18 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener } @After - public void afterTest() throws IOException { + public void afterTest() throws IOException, InterpreterException { sql.close(); } @Test - public void dependenciesAreInstalled() { + public void dependenciesAreInstalled() throws InterpreterException { InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); } @Test - public void errorMessageIfDependenciesNotInstalled() { + public void errorMessageIfDependenciesNotInstalled() throws InterpreterException { InterpreterResult ret; ret = sql.interpret("SELECT * from something", context); @@ -128,7 +129,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener } @Test - public void sqlOverTestDataPrintsTable() throws IOException { + public void sqlOverTestDataPrintsTable() throws IOException, InterpreterException { InterpreterResult ret; // given //String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34"; @@ -152,7 +153,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener } @Test - public void badSqlSyntaxFails() throws IOException { + public void badSqlSyntaxFails() throws IOException, InterpreterException { //when InterpreterResult ret = sql.interpret("select wrong syntax", context); @@ -162,7 +163,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener } @Test - public void showDataFrame() throws IOException { + public void showDataFrame() throws IOException, InterpreterException { InterpreterResult ret; ret = python.interpret("import pandas as pd", context); ret = python.interpret("import numpy as np", context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java index 195935d..ef328f0 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java @@ -38,6 +38,7 @@ import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; @@ -64,7 +65,7 @@ public class PythonInterpreterTest implements InterpreterOutputListener { } @Before - public void beforeTest() throws IOException { + public void beforeTest() throws IOException, InterpreterException { cmdHistory = ""; // python interpreter @@ -96,20 +97,20 @@ public class PythonInterpreterTest implements InterpreterOutputListener { } @Test - public void testInterpret() throws InterruptedException, IOException { + public void testInterpret() throws InterruptedException, IOException, InterpreterException { InterpreterResult result = pythonInterpreter.interpret("print (\"hi\")", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); } @Test - public void testInterpretInvalidSyntax() throws IOException { + public void testInterpretInvalidSyntax() throws IOException, InterpreterException { InterpreterResult result = pythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\n", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("hi\nhi\nhi")); } @Test - public void testRedefinitionZeppelinContext() { + public void testRedefinitionZeppelinContext() throws InterpreterException { String pyRedefinitionCode = "z = 1\n"; String pyRestoreCode = "z = __zeppelin__\n"; String pyValidCode = "z.input(\"test\")\n"; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java ---------------------------------------------------------------------- diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index 7156c37..d3ebada 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -69,7 +69,7 @@ public class ScaldingInterpreter extends Interpreter { @Override public void open() { numOpenInstances = numOpenInstances + 1; - String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES, + String maxOpenInstancesStr = getProperty(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT); int maxOpenInstances = 50; try { @@ -83,8 +83,8 @@ public class ScaldingInterpreter extends Interpreter { return; } logger.info("Opening instance {}", numOpenInstances); - logger.info("property: {}", property); - String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); + logger.info("property: {}", getProperties()); + String argsString = getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); String[] args; if (argsString == null) { args = new String[0]; @@ -121,7 +121,7 @@ public class ScaldingInterpreter extends Interpreter { return new InterpreterResult(Code.SUCCESS); } InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR); - if (property.getProperty(ARGS_STRING).contains("hdfs")) { + if (getProperty(ARGS_STRING).contains("hdfs")) { UserGroupInformation ugi = null; try { ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java index dbae6c9..b7c0043 100644 --- a/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java +++ b/shell/src/main/java/org/apache/zeppelin/shell/ShellInterpreter.java @@ -174,8 +174,8 @@ public class ShellInterpreter extends KerberosInterpreter { return false; } - public void createSecureConfiguration() { - Properties properties = getProperty(); + public void createSecureConfiguration() throws InterpreterException { + Properties properties = getProperties(); CommandLine cmdLine = CommandLine.parse(shell); cmdLine.addArgument("-c", false); String kinitCommand = String.format("kinit -k -t %s %s", http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index 56b3823..a050569 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -21,12 +21,15 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.python.IPythonInterpreter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Map; import java.util.Properties; /** @@ -43,8 +46,9 @@ public class IPySparkInterpreter extends IPythonInterpreter { } @Override - public void open() { - property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property)); + public void open() throws InterpreterException { + setProperty("zeppelin.python", + PySparkInterpreter.getPythonExec(getProperties())); sparkInterpreter = getSparkInterpreter(); SparkConf conf = sparkInterpreter.getSparkContext().getConf(); // only set PYTHONPATH in local or yarn-client mode. @@ -57,7 +61,18 @@ public class IPySparkInterpreter extends IPythonInterpreter { super.open(); } - private SparkInterpreter getSparkInterpreter() { + @Override + protected Map<String, String> setupIPythonEnv() throws IOException { + Map<String, String> env = super.setupIPythonEnv(); + // set PYSPARK_PYTHON + SparkConf conf = sparkInterpreter.getSparkContext().getConf(); + if (conf.contains("spark.pyspark.python")) { + env.put("PYSPARK_PYTHON", conf.get("spark.pyspark.python")); + } + return env; + } + + private SparkInterpreter getSparkInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; SparkInterpreter spark = null; Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index dd32059..5df4ec6 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -86,11 +86,11 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py"); scriptPath = scriptFile.getAbsolutePath(); } catch (IOException e) { - throw new InterpreterException(e); + throw new RuntimeException(e); } } - private void createPythonScript() { + private void createPythonScript() throws InterpreterException { ClassLoader classLoader = getClass().getClassLoader(); File out = new File(scriptPath); @@ -112,10 +112,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } @Override - public void open() { + public void open() throws InterpreterException { // try IPySparkInterpreter first iPySparkInterpreter = getIPySparkInterpreter(); - if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") && + if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") && iPySparkInterpreter.checkIPythonPrerequisite()) { try { iPySparkInterpreter.open(); @@ -132,8 +132,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } iPySparkInterpreter = null; - - if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) { + if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) { // don't print it when it is in testing, just for easy output check in test. try { InterpreterContext.get().out.write(("IPython is not available, " + @@ -202,7 +201,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - private Map setupPySparkEnv() throws IOException { + private Map setupPySparkEnv() throws IOException, InterpreterException { Map env = EnvironmentUtils.getProcEnvironment(); // only set PYTHONPATH in local or yarn-client mode. @@ -229,6 +228,11 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH")); + + // set PYSPARK_PYTHON + if (getSparkConf().contains("spark.pyspark.python")) { + env.put("PYSPARK_PYTHON", getSparkConf().get("spark.pyspark.python")); + } return env; } @@ -246,7 +250,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return pythonExec; } - private void createGatewayServerAndStartScript() { + private void createGatewayServerAndStartScript() throws InterpreterException { // create python script createPythonScript(); @@ -255,7 +259,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand gatewayServer = new GatewayServer(this, port); gatewayServer.start(); - String pythonExec = getPythonExec(property); + String pythonExec = getPythonExec(getProperties()); LOGGER.info("pythonExec: " + pythonExec); CommandLine cmd = CommandLine.parse(pythonExec); cmd.addArgument(scriptPath, false); @@ -295,7 +299,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - private int findRandomOpenPortOnAllLocalInterfaces() { + private int findRandomOpenPortOnAllLocalInterfaces() throws InterpreterException { int port; try (ServerSocket socket = new ServerSocket(0);) { port = socket.getLocalPort(); @@ -394,7 +398,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { SparkInterpreter sparkInterpreter = getSparkInterpreter(); sparkInterpreter.populateSparkWebUrl(context); if (sparkInterpreter.isUnsupportedSparkVersion()) { @@ -500,7 +505,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } @Override - public void cancel(InterpreterContext context) { + public void cancel(InterpreterContext context) throws InterpreterException { if (iPySparkInterpreter != null) { iPySparkInterpreter.cancel(context); return; @@ -520,7 +525,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } @Override - public int getProgress(InterpreterContext context) { + public int getProgress(InterpreterContext context) throws InterpreterException { if (iPySparkInterpreter != null) { return iPySparkInterpreter.getProgress(context); } @@ -531,7 +536,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) throws InterpreterException { if (iPySparkInterpreter != null) { return iPySparkInterpreter.completion(buf, cursor, interpreterContext); } @@ -632,7 +637,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } - private SparkInterpreter getSparkInterpreter() { + private SparkInterpreter getSparkInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; SparkInterpreter spark = null; Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); @@ -666,7 +671,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return iPySpark; } - public SparkZeppelinContext getZeppelinContext() { + public SparkZeppelinContext getZeppelinContext() throws InterpreterException { SparkInterpreter sparkIntp = getSparkInterpreter(); if (sparkIntp != null) { return getSparkInterpreter().getZeppelinContext(); @@ -675,7 +680,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - public JavaSparkContext getJavaSparkContext() { + public JavaSparkContext getJavaSparkContext() throws InterpreterException { SparkInterpreter intp = getSparkInterpreter(); if (intp == null) { return null; @@ -684,7 +689,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - public Object getSparkSession() { + public Object getSparkSession() throws InterpreterException { SparkInterpreter intp = getSparkInterpreter(); if (intp == null) { return null; @@ -693,7 +698,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - public SparkConf getSparkConf() { + public SparkConf getSparkConf() throws InterpreterException { JavaSparkContext sc = getJavaSparkContext(); if (sc == null) { return null; @@ -702,7 +707,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } } - public SQLContext getSQLContext() { + public SQLContext getSQLContext() throws InterpreterException { SparkInterpreter intp = getSparkInterpreter(); if (intp == null) { return null;