http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 18da034..5a5110f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -353,7 +353,7 @@ public class SparkInterpreter extends Interpreter { public boolean isYarnMode() { String master = getProperty("master"); if (master == null) { - master = getProperty().getProperty("spark.master", "local[*]"); + master = getProperty("spark.master", "local[*]"); } return master.startsWith("yarn"); } @@ -376,7 +376,7 @@ public class SparkInterpreter extends Interpreter { } conf.set("spark.scheduler.mode", "FAIR"); - Properties intpProperty = getProperty(); + Properties intpProperty = getProperties(); for (Object k : intpProperty.keySet()) { String key = (String) k; String val = toString(intpProperty.get(key)); @@ -509,7 +509,7 @@ public class SparkInterpreter extends Interpreter { } conf.set("spark.scheduler.mode", "FAIR"); - Properties intpProperty = getProperty(); + Properties intpProperty = getProperties(); for (Object k : intpProperty.keySet()) { String key = (String) k; String val = toString(intpProperty.get(key)); @@ -543,19 +543,19 @@ public class SparkInterpreter extends Interpreter { } @Override - public void open() { + public void open() throws InterpreterException { this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean( - property.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); + getProperty("zeppelin.spark.enableSupportedVersionCheck", "true")); // set properties and do login before creating any spark stuff for secured cluster if (isYarnMode()) { System.setProperty("SPARK_YARN_MODE", "true"); } - if (getProperty().containsKey("spark.yarn.keytab") && - getProperty().containsKey("spark.yarn.principal")) { + if (getProperties().containsKey("spark.yarn.keytab") && + getProperties().containsKey("spark.yarn.principal")) { try { - String keytab = getProperty().getProperty("spark.yarn.keytab"); - String principal = getProperty().getProperty("spark.yarn.principal"); + String keytab = getProperties().getProperty("spark.yarn.keytab"); + String principal = getProperties().getProperty("spark.yarn.principal"); UserGroupInformation.loginUserFromKeytab(principal, keytab); } catch (IOException e) { throw new RuntimeException("Can not pass kerberos authentication", e); @@ -963,7 +963,7 @@ public class SparkInterpreter extends Interpreter { sparkUrl = getSparkUIUrl(); Map<String, String> infos = new java.util.HashMap<>(); infos.put("url", sparkUrl); - String uiEnabledProp = property.getProperty("spark.ui.enabled", "true"); + String uiEnabledProp = getProperty("spark.ui.enabled", "true"); java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean( uiEnabledProp.trim()); if (!uiEnabled) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index ca52f79..1bdd4dc 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -54,7 +54,7 @@ public class SparkRInterpreter extends Interpreter { } @Override - public void open() { + public void open() throws InterpreterException { String rCmdPath = getProperty("zeppelin.R.cmd"); String sparkRLibPath; @@ -105,7 +105,8 @@ public class SparkRInterpreter extends Interpreter { } @Override - public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) { + public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) + throws InterpreterException { SparkInterpreter sparkInterpreter = getSparkInterpreter(); sparkInterpreter.populateSparkWebUrl(interpreterContext); @@ -220,7 +221,7 @@ public class SparkRInterpreter extends Interpreter { return new ArrayList<>(); } - private SparkInterpreter getSparkInterpreter() { + private SparkInterpreter getSparkInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; SparkInterpreter spark = null; Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 134a65f..9709f9e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -59,7 +59,7 @@ public class SparkSqlInterpreter extends Interpreter { this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS)); } - private SparkInterpreter getSparkInterpreter() { + private SparkInterpreter getSparkInterpreter() throws InterpreterException { LazyOpenInterpreter lazy = null; SparkInterpreter spark = null; Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName()); @@ -86,7 +86,8 @@ public class SparkSqlInterpreter extends Interpreter { public void close() {} @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { SQLContext sqlc = null; SparkInterpreter sparkInterpreter = getSparkInterpreter(); @@ -134,7 +135,7 @@ public class SparkSqlInterpreter extends Interpreter { } @Override - public void cancel(InterpreterContext context) { + public void cancel(InterpreterContext context) throws InterpreterException { SparkInterpreter sparkInterpreter = getSparkInterpreter(); SQLContext sqlc = sparkInterpreter.getSQLContext(); SparkContext sc = sqlc.sparkContext(); @@ -149,7 +150,7 @@ public class SparkSqlInterpreter extends Interpreter { @Override - public int getProgress(InterpreterContext context) { + public int getProgress(InterpreterContext context) throws InterpreterException { SparkInterpreter sparkInterpreter = getSparkInterpreter(); return sparkInterpreter.getProgress(context); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java index 413c690..09b8d44 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkZeppelinContext.java @@ -79,7 +79,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext { } if (supportedClasses.isEmpty()) { - throw new InterpreterException("Can not load Dataset/DataFrame/SchemaRDD class"); + throw new RuntimeException("Can not load Dataset/DataFrame/SchemaRDD class"); } } @@ -112,7 +112,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext { } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | ClassCastException e) { sc.clearJobGroup(); - throw new InterpreterException(e); + throw new RuntimeException(e); } List<Attribute> columns = null; @@ -129,7 +129,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext { .asJava(); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); + throw new RuntimeException(e); } StringBuilder msg = new StringBuilder(); @@ -165,7 +165,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext { } } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); + throw new RuntimeException(e); } if (rows.length > maxResult) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index b46001a..130d849 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -117,7 +117,7 @@ public class ZeppelinR implements ExecuteResultHandler { File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); scriptPath = scriptFile.getAbsolutePath(); } catch (IOException e) { - throw new InterpreterException(e); + throw new RuntimeException(e); } } @@ -125,7 +125,7 @@ public class ZeppelinR implements ExecuteResultHandler { * Start R repl * @throws IOException */ - public void open() throws IOException { + public void open() throws IOException, InterpreterException { createRScript(); zeppelinR.put(hashCode(), this); @@ -170,7 +170,7 @@ public class ZeppelinR implements ExecuteResultHandler { * @param expr * @return */ - public Object eval(String expr) { + public Object eval(String expr) throws InterpreterException { synchronized (this) { rRequestObject = new Request("eval", expr, null); return request(); @@ -182,7 +182,7 @@ public class ZeppelinR implements ExecuteResultHandler { * @param key * @param value */ - public void set(String key, Object value) { + public void set(String key, Object value) throws InterpreterException { synchronized (this) { rRequestObject = new Request("set", key, value); request(); @@ -194,7 +194,7 @@ public class ZeppelinR implements ExecuteResultHandler { * @param key * @return */ - public Object get(String key) { + public Object get(String key) throws InterpreterException { synchronized (this) { rRequestObject = new Request("get", key, null); return request(); @@ -206,7 +206,7 @@ public class ZeppelinR implements ExecuteResultHandler { * @param key * @return */ - public String getS0(String key) { + public String getS0(String key) throws InterpreterException { synchronized (this) { rRequestObject = new Request("getS", key, null); return (String) request(); @@ -217,7 +217,7 @@ public class ZeppelinR implements ExecuteResultHandler { * Send request to r repl and return response * @return responseValue */ - private Object request() throws RuntimeException { + private Object request() throws RuntimeException, InterpreterException { if (!rScriptRunning) { throw new RuntimeException("r repl is not running"); } @@ -332,7 +332,7 @@ public class ZeppelinR implements ExecuteResultHandler { /** * Create R script in tmp dir */ - private void createRScript() { + private void createRScript() throws InterpreterException { ClassLoader classLoader = getClass().getClassLoader(); File out = new File(scriptPath); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index 3f7cf75..faf0473 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -24,6 +24,7 @@ import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; @@ -55,7 +56,7 @@ public class IPySparkInterpreterTest { private InterpreterGroup intpGroup; @Before - public void setup() { + public void setup() throws InterpreterException { Properties p = new Properties(); p.setProperty("spark.master", "local[4]"); p.setProperty("master", "local[4]"); @@ -90,7 +91,7 @@ public class IPySparkInterpreterTest { } @Test - public void testBasics() throws InterruptedException, IOException { + public void testBasics() throws InterruptedException, IOException, InterpreterException { // all the ipython test should pass too. IPythonInterpreterTest.testInterpreter(iPySparkInterpreter); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java index d695037..692447e 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java @@ -64,7 +64,7 @@ public class PySparkInterpreterMatplotlibTest { * normally handles this in real use cases. */ @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { context.out.clear(); InterpreterResult result = super.interpret(st, context); List<InterpreterResultMessage> resultMessages = null; @@ -140,7 +140,7 @@ public class PySparkInterpreterMatplotlibTest { } @Test - public void dependenciesAreInstalled() { + public void dependenciesAreInstalled() throws InterpreterException { // matplotlib InterpreterResult ret = pyspark.interpret("import matplotlib", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); @@ -151,7 +151,7 @@ public class PySparkInterpreterMatplotlibTest { } @Test - public void showPlot() { + public void showPlot() throws InterpreterException { // Simple plot test InterpreterResult ret; ret = pyspark.interpret("import matplotlib.pyplot as plt", context); @@ -168,7 +168,7 @@ public class PySparkInterpreterMatplotlibTest { @Test // Test for when configuration is set to auto-close figures after show(). - public void testClose() { + public void testClose() throws InterpreterException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -195,7 +195,7 @@ public class PySparkInterpreterMatplotlibTest { @Test // Test for when configuration is set to not auto-close figures after show(). - public void testNoClose() { + public void testNoClose() throws InterpreterException { InterpreterResult ret; InterpreterResult ret1; InterpreterResult ret2; @@ -222,7 +222,7 @@ public class PySparkInterpreterMatplotlibTest { @Test // Test angular mode - public void testAngular() { + public void testAngular() throws InterpreterException { InterpreterResult ret; ret = pyspark.interpret("import matplotlib.pyplot as plt", context); ret = pyspark.interpret("plt.close()", context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 7a4abd6..a95c5ef 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -112,7 +112,7 @@ public class PySparkInterpreterTest { } @Test - public void testBasicIntp() { + public void testBasicIntp() throws InterpreterException { if (getSparkVersionNumber() > 11) { assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret("a = 1\n", context).code()); @@ -136,7 +136,7 @@ public class PySparkInterpreterTest { } @Test - public void testCompletion() { + public void testCompletion() throws InterpreterException { if (getSparkVersionNumber() > 11) { List<InterpreterCompletion> completions = pySparkInterpreter.completion("sc.", "sc.".length(), null); assertTrue(completions.size() > 0); @@ -144,7 +144,7 @@ public class PySparkInterpreterTest { } @Test - public void testRedefinitionZeppelinContext() { + public void testRedefinitionZeppelinContext() throws InterpreterException { if (getSparkVersionNumber() > 11) { String redefinitionCode = "z = 1\n"; String restoreCode = "z = __zeppelin__\n"; @@ -162,7 +162,12 @@ public class PySparkInterpreterTest { @Override public void run() { String code = "import time\nwhile True:\n time.sleep(1)" ; - InterpreterResult ret = pySparkInterpreter.interpret(code, context); + InterpreterResult ret = null; + try { + ret = pySparkInterpreter.interpret(code, context); + } catch (InterpreterException e) { + e.printStackTrace(); + } assertNotNull(ret); Pattern expectedMessage = Pattern.compile("KeyboardInterrupt"); Matcher m = expectedMessage.matcher(ret.message().toString()); @@ -171,7 +176,7 @@ public class PySparkInterpreterTest { } @Test - public void testCancelIntp() throws InterruptedException { + public void testCancelIntp() throws InterruptedException, InterpreterException { if (getSparkVersionNumber() > 11) { assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret("a = 1\n", context).code()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index ece292b..aaf13c3 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -213,7 +213,7 @@ public class SparkInterpreterTest { } @Test - public void testSparkSql() throws IOException { + public void testSparkSql() throws IOException, InterpreterException { repl.interpret("case class Person(name:String, age:Int)\n", context); repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code()); @@ -243,7 +243,7 @@ public class SparkInterpreterTest { @Test public void emptyConfigurationVariablesOnlyForNonSparkProperties() { - Properties intpProperty = repl.getProperty(); + Properties intpProperty = repl.getProperties(); SparkConf sparkConf = repl.getSparkContext().getConf(); for (Object oKey : intpProperty.keySet()) { String key = (String) oKey; @@ -256,7 +256,7 @@ public class SparkInterpreterTest { } @Test - public void shareSingleSparkContext() throws InterruptedException, IOException { + public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException { // create another SparkInterpreter SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir)); repl2.setInterpreterGroup(intpGroup); @@ -272,7 +272,7 @@ public class SparkInterpreterTest { } @Test - public void testEnableImplicitImport() throws IOException { + public void testEnableImplicitImport() throws IOException, InterpreterException { if (getSparkVersionNumber(repl) >= 13) { // Set option of importing implicits to "true", and initialize new Spark repl Properties p = getSparkTestProperties(tmpDir); @@ -289,7 +289,7 @@ public class SparkInterpreterTest { } @Test - public void testDisableImplicitImport() throws IOException { + public void testDisableImplicitImport() throws IOException, InterpreterException { if (getSparkVersionNumber(repl) >= 13) { // Set option of importing implicits to "false", and initialize new Spark repl // this test should return error status when creating DataFrame from sequence http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index ebb5e9a..3e33f44 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -85,7 +85,7 @@ public class SparkSqlInterpreterTest { } @Test - public void test() { + public void test() throws InterpreterException { repl.interpret("case class Test(name:String, age:Int)", context); repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); if (isDataFrameSupported()) { @@ -107,7 +107,7 @@ public class SparkSqlInterpreterTest { } @Test - public void testStruct() { + public void testStruct() throws InterpreterException { repl.interpret("case class Person(name:String, age:Int)", context); repl.interpret("case class People(group:String, person:Person)", context); repl.interpret( @@ -124,7 +124,7 @@ public class SparkSqlInterpreterTest { } @Test - public void test_null_value_in_row() { + public void test_null_value_in_row() throws InterpreterException { repl.interpret("import org.apache.spark.sql._", context); if (isDataFrameSupported()) { repl.interpret( @@ -162,7 +162,7 @@ public class SparkSqlInterpreterTest { } @Test - public void testMaxResults() { + public void testMaxResults() throws InterpreterException { repl.interpret("case class P(age:Int)", context); repl.interpret( "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))", http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 00184da..3bb50c7 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -67,6 +67,11 @@ </dependency> <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + </dependency> + + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>${commons.exec.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java new file mode 100644 index 0000000..3a82bc5 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -0,0 +1,847 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.conf; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.configuration.tree.ConfigurationNode; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Zeppelin configuration. + * + */ +public class ZeppelinConfiguration extends XMLConfiguration { + private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml"; + private static final long serialVersionUID = 4749305895693848035L; + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class); + + private static final String HELIUM_PACKAGE_DEFAULT_URL = + "https://s3.amazonaws.com/helium-package/helium.json"; + private static ZeppelinConfiguration conf; + + public ZeppelinConfiguration(URL url) throws ConfigurationException { + setDelimiterParsingDisabled(true); + load(url); + } + + public ZeppelinConfiguration() { + ConfVars[] vars = ConfVars.values(); + for (ConfVars v : vars) { + if (v.getType() == ConfVars.VarType.BOOLEAN) { + this.setProperty(v.getVarName(), v.getBooleanValue()); + } else if (v.getType() == ConfVars.VarType.LONG) { + this.setProperty(v.getVarName(), v.getLongValue()); + } else if (v.getType() == ConfVars.VarType.INT) { + this.setProperty(v.getVarName(), v.getIntValue()); + } else if (v.getType() == ConfVars.VarType.FLOAT) { + this.setProperty(v.getVarName(), v.getFloatValue()); + } else if (v.getType() == ConfVars.VarType.STRING) { + this.setProperty(v.getVarName(), v.getStringValue()); + } else { + throw new RuntimeException("Unsupported VarType"); + } + } + + } + + + /** + * Load from resource. + *url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML); + * @throws ConfigurationException + */ + public static synchronized ZeppelinConfiguration create() { + if (conf != null) { + return conf; + } + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + URL url; + + url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML); + if (url == null) { + ClassLoader cl = ZeppelinConfiguration.class.getClassLoader(); + if (cl != null) { + url = cl.getResource(ZEPPELIN_SITE_XML); + } + } + if (url == null) { + url = classLoader.getResource(ZEPPELIN_SITE_XML); + } + + if (url == null) { + LOG.warn("Failed to load configuration, proceeding with a default"); + conf = new ZeppelinConfiguration(); + } else { + try { + LOG.info("Load configuration from " + url); + conf = new ZeppelinConfiguration(url); + } catch (ConfigurationException e) { + LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e); + conf = new ZeppelinConfiguration(); + } + } + + LOG.info("Server Host: " + conf.getServerAddress()); + if (conf.useSsl() == false) { + LOG.info("Server Port: " + conf.getServerPort()); + } else { + LOG.info("Server SSL Port: " + conf.getServerSslPort()); + } + LOG.info("Context Path: " + conf.getServerContextPath()); + LOG.info("Zeppelin Version: " + Util.getVersion()); + + return conf; + } + + + private String getStringValue(String name, String d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.isEmpty()) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() + && name.equals(p.getChildren("name").get(0).getValue())) { + return (String) p.getChildren("value").get(0).getValue(); + } + } + return d; + } + + private int getIntValue(String name, int d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.isEmpty()) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() + && name.equals(p.getChildren("name").get(0).getValue())) { + return Integer.parseInt((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + private long getLongValue(String name, long d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.isEmpty()) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() + && name.equals(p.getChildren("name").get(0).getValue())) { + return Long.parseLong((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + private float getFloatValue(String name, float d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.isEmpty()) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() + && name.equals(p.getChildren("name").get(0).getValue())) { + return Float.parseFloat((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + private boolean getBooleanValue(String name, boolean d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.isEmpty()) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() + && name.equals(p.getChildren("name").get(0).getValue())) { + return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + public String getString(ConfVars c) { + return getString(c.name(), c.getVarName(), c.getStringValue()); + } + + public String getString(String envName, String propertyName, String defaultValue) { + if (System.getenv(envName) != null) { + return System.getenv(envName); + } + if (System.getProperty(propertyName) != null) { + return System.getProperty(propertyName); + } + + return getStringValue(propertyName, defaultValue); + } + + public int getInt(ConfVars c) { + return getInt(c.name(), c.getVarName(), c.getIntValue()); + } + + public int getInt(String envName, String propertyName, int defaultValue) { + if (System.getenv(envName) != null) { + return Integer.parseInt(System.getenv(envName)); + } + + if (System.getProperty(propertyName) != null) { + return Integer.parseInt(System.getProperty(propertyName)); + } + return getIntValue(propertyName, defaultValue); + } + + public long getLong(ConfVars c) { + return getLong(c.name(), c.getVarName(), c.getLongValue()); + } + + public long getLong(String envName, String propertyName, long defaultValue) { + if (System.getenv(envName) != null) { + return Long.parseLong(System.getenv(envName)); + } + + if (System.getProperty(propertyName) != null) { + return Long.parseLong(System.getProperty(propertyName)); + } + return getLongValue(propertyName, defaultValue); + } + + public float getFloat(ConfVars c) { + return getFloat(c.name(), c.getVarName(), c.getFloatValue()); + } + + public float getFloat(String envName, String propertyName, float defaultValue) { + if (System.getenv(envName) != null) { + return Float.parseFloat(System.getenv(envName)); + } + if (System.getProperty(propertyName) != null) { + return Float.parseFloat(System.getProperty(propertyName)); + } + return getFloatValue(propertyName, defaultValue); + } + + public boolean getBoolean(ConfVars c) { + return getBoolean(c.name(), c.getVarName(), c.getBooleanValue()); + } + + public boolean getBoolean(String envName, String propertyName, boolean defaultValue) { + if (System.getenv(envName) != null) { + return Boolean.parseBoolean(System.getenv(envName)); + } + + if (System.getProperty(propertyName) != null) { + return Boolean.parseBoolean(System.getProperty(propertyName)); + } + return getBooleanValue(propertyName, defaultValue); + } + + public boolean useSsl() { + return getBoolean(ConfVars.ZEPPELIN_SSL); + } + + public int getServerSslPort() { + return getInt(ConfVars.ZEPPELIN_SSL_PORT); + } + + public boolean useClientAuth() { + return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH); + } + + public String getServerAddress() { + return getString(ConfVars.ZEPPELIN_ADDR); + } + + public int getServerPort() { + return getInt(ConfVars.ZEPPELIN_PORT); + } + + public String getServerContextPath() { + return getString(ConfVars.ZEPPELIN_SERVER_CONTEXT_PATH); + } + + public String getKeyStorePath() { + String path = getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH); + if (path != null && path.startsWith("/") || isWindowsPath(path)) { + return path; + } else { + return getRelativeDir( + String.format("%s/%s", + getConfDir(), + path)); + } + } + + public String getKeyStoreType() { + return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE); + } + + public String getKeyStorePassword() { + return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD); + } + + public String getKeyManagerPassword() { + String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD); + if (password == null) { + return getKeyStorePassword(); + } else { + return password; + } + } + + public String getTrustStorePath() { + String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH); + if (path == null) { + path = getKeyStorePath(); + } + if (path != null && path.startsWith("/") || isWindowsPath(path)) { + return path; + } else { + return getRelativeDir( + String.format("%s/%s", + getConfDir(), + path)); + } + } + + public String getTrustStoreType() { + String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE); + if (type == null) { + return getKeyStoreType(); + } else { + return type; + } + } + + public String getTrustStorePassword() { + String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD); + if (password == null) { + return getKeyStorePassword(); + } else { + return password; + } + } + + public String getNotebookDir() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR); + } + + public String getUser() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER); + } + + public String getBucketName() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_BUCKET); + } + + public String getEndpoint() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_ENDPOINT); + } + + public String getS3KMSKeyID() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID); + } + + public String getS3KMSKeyRegion() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION); + } + + public String getS3EncryptionMaterialsProviderClass() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_EMP); + } + + public boolean isS3ServerSideEncryption() { + return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_S3_SSE); + } + + public String getMongoUri() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI); + } + + public String getMongoDatabase() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_DATABASE); + } + + public String getMongoCollection() { + return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION); + } + + public boolean getMongoAutoimport() { + return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT); + } + + public String getInterpreterListPath() { + return getRelativeDir(String.format("%s/interpreter-list", getConfDir())); + } + + public String getInterpreterDir() { + return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR); + } + + public String getInterpreterJson() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON); + } + + public String getInterpreterSettingPath() { + return getRelativeDir(String.format("%s/interpreter.json", getConfDir())); + } + + public String getHeliumConfPath() { + return getRelativeDir(String.format("%s/helium.json", getConfDir())); + } + + public String getHeliumRegistry() { + return getRelativeDir(ConfVars.ZEPPELIN_HELIUM_REGISTRY); + } + + public String getHeliumNodeInstallerUrl() { + return getString(ConfVars.ZEPPELIN_HELIUM_NODE_INSTALLER_URL); + } + + public String getHeliumNpmInstallerUrl() { + return getString(ConfVars.ZEPPELIN_HELIUM_NPM_INSTALLER_URL); + } + + public String getHeliumYarnInstallerUrl() { + return getString(ConfVars.ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL); + } + + public String getNotebookAuthorizationPath() { + return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir())); + } + + public Boolean credentialsPersist() { + return getBoolean(ConfVars.ZEPPELIN_CREDENTIALS_PERSIST); + } + + public String getCredentialsEncryptKey() { + return getString(ConfVars.ZEPPELIN_CREDENTIALS_ENCRYPT_KEY); + } + + public String getCredentialsPath() { + return getRelativeDir(String.format("%s/credentials.json", getConfDir())); + } + + public String getShiroPath() { + String shiroPath = getRelativeDir(String.format("%s/shiro.ini", getConfDir())); + return new File(shiroPath).exists() ? shiroPath : StringUtils.EMPTY; + } + + public String getInterpreterRemoteRunnerPath() { + return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER); + } + + public String getInterpreterLocalRepoPath() { + return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO); + } + + public String getInterpreterMvnRepoPath() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO); + } + + public String getRelativeDir(ConfVars c) { + return getRelativeDir(getString(c)); + } + + public String getRelativeDir(String path) { + if (path != null && path.startsWith("/") || isWindowsPath(path)) { + return path; + } else { + return getString(ConfVars.ZEPPELIN_HOME) + "/" + path; + } + } + + public String getCallbackPortRange() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE); + } + + public boolean isWindowsPath(String path){ + return path.matches("^[A-Za-z]:\\\\.*"); + } + + public boolean isAnonymousAllowed() { + return getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED); + } + + public boolean isNotebokPublic() { + return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC); + } + + public String getConfDir() { + return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR); + } + + public List<String> getAllowedOrigins() + { + if (getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).isEmpty()) { + return Arrays.asList(new String[0]); + } + + return Arrays.asList(getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).toLowerCase().split(",")); + } + + public String getWebsocketMaxTextMessageSize() { + return getString(ConfVars.ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE); + } + + public String getJettyName() { + return getString(ConfVars.ZEPPELIN_SERVER_JETTY_NAME); + } + + + public String getXFrameOptions() { + return getString(ConfVars.ZEPPELIN_SERVER_XFRAME_OPTIONS); + } + + public String getXxssProtection() { + return getString(ConfVars.ZEPPELIN_SERVER_X_XSS_PROTECTION); + } + + public String getStrictTransport() { + return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT); + } + + + public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf, + ConfigurationKeyPredicate predicate) { + Map<String, String> configurations = new HashMap<>(); + + for (ConfVars v : ConfVars.values()) { + String key = v.getVarName(); + + if (!predicate.apply(key)) { + continue; + } + + ConfVars.VarType type = v.getType(); + Object value = null; + if (type == ConfVars.VarType.BOOLEAN) { + value = conf.getBoolean(v); + } else if (type == ConfVars.VarType.LONG) { + value = conf.getLong(v); + } else if (type == ConfVars.VarType.INT) { + value = conf.getInt(v); + } else if (type == ConfVars.VarType.FLOAT) { + value = conf.getFloat(v); + } else if (type == ConfVars.VarType.STRING) { + value = conf.getString(v); + } + + if (value != null) { + configurations.put(key, value.toString()); + } + } + return configurations; + } + + /** + * Predication whether key/value pair should be included or not + */ + public interface ConfigurationKeyPredicate { + boolean apply(String key); + } + + /** + * Wrapper class. + */ + public static enum ConfVars { + ZEPPELIN_HOME("zeppelin.home", "./"), + ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"), + ZEPPELIN_PORT("zeppelin.server.port", 8080), + ZEPPELIN_SERVER_CONTEXT_PATH("zeppelin.server.context.path", "/"), + ZEPPELIN_SSL("zeppelin.ssl", false), + ZEPPELIN_SSL_PORT("zeppelin.server.ssl.port", 8443), + ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false), + ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"), + ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"), + ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""), + ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null), + ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null), + ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null), + ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null), + ZEPPELIN_WAR("zeppelin.war", "zeppelin-web/dist"), + ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"), + ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter," + + "org.apache.zeppelin.spark.PySparkInterpreter," + + "org.apache.zeppelin.rinterpreter.RRepl," + + "org.apache.zeppelin.rinterpreter.KnitR," + + "org.apache.zeppelin.spark.SparkRInterpreter," + + "org.apache.zeppelin.spark.SparkSqlInterpreter," + + "org.apache.zeppelin.spark.DepInterpreter," + + "org.apache.zeppelin.markdown.Markdown," + + "org.apache.zeppelin.angular.AngularInterpreter," + + "org.apache.zeppelin.shell.ShellInterpreter," + + "org.apache.zeppelin.livy.LivySparkInterpreter," + + "org.apache.zeppelin.livy.LivySparkSQLInterpreter," + + "org.apache.zeppelin.livy.LivyPySparkInterpreter," + + "org.apache.zeppelin.livy.LivyPySpark3Interpreter," + + "org.apache.zeppelin.livy.LivySparkRInterpreter," + + "org.apache.zeppelin.alluxio.AlluxioInterpreter," + + "org.apache.zeppelin.file.HDFSFileInterpreter," + + "org.apache.zeppelin.pig.PigInterpreter," + + "org.apache.zeppelin.pig.PigQueryInterpreter," + + "org.apache.zeppelin.flink.FlinkInterpreter," + + "org.apache.zeppelin.python.PythonInterpreter," + + "org.apache.zeppelin.python.PythonInterpreterPandasSql," + + "org.apache.zeppelin.python.PythonCondaInterpreter," + + "org.apache.zeppelin.python.PythonDockerInterpreter," + + "org.apache.zeppelin.ignite.IgniteInterpreter," + + "org.apache.zeppelin.ignite.IgniteSqlInterpreter," + + "org.apache.zeppelin.lens.LensInterpreter," + + "org.apache.zeppelin.cassandra.CassandraInterpreter," + + "org.apache.zeppelin.geode.GeodeOqlInterpreter," + + "org.apache.zeppelin.kylin.KylinInterpreter," + + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter," + + "org.apache.zeppelin.scalding.ScaldingInterpreter," + + "org.apache.zeppelin.jdbc.JDBCInterpreter," + + "org.apache.zeppelin.hbase.HbaseInterpreter," + + "org.apache.zeppelin.bigquery.BigQueryInterpreter," + + "org.apache.zeppelin.beam.BeamInterpreter," + + "org.apache.zeppelin.scio.ScioInterpreter," + + "org.apache.zeppelin.groovy.GroovyInterpreter," + + "org.apache.zeppelin.neo4j.Neo4jCypherInterpreter" + ), + ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), + ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), + ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), + ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo", + "http://repo1.maven.org/maven2/"), + ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), + ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), + ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," + + "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy,neo4j"), + ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100), + ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), + ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), + // use specified notebook (id) as homescreen + ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null), + // whether homescreen notebook will be hidden from notebook list or not + ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", false), + ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"), + ZEPPELIN_NOTEBOOK_S3_ENDPOINT("zeppelin.notebook.s3.endpoint", "s3.amazonaws.com"), + ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"), + ZEPPELIN_NOTEBOOK_S3_EMP("zeppelin.notebook.s3.encryptionMaterialsProvider", null), + ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID("zeppelin.notebook.s3.kmsKeyID", null), + ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION("zeppelin.notebook.s3.kmsKeyRegion", null), + ZEPPELIN_NOTEBOOK_S3_SSE("zeppelin.notebook.s3.sse", false), + ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null), + ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"), + ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"), + ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", "zeppelin"), + ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"), + ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"), + ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false), + ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", + "org.apache.zeppelin.notebook.repo.GitNotebookRepo"), + ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false), + // whether by default note is public or private + ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true), + ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", + System.getProperty("os.name") + .startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"), + // Decide when new note is created, interpreter settings will be binded automatically or not. + ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true), + ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"), + ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"), + ZEPPELIN_HELIUM_REGISTRY("zeppelin.helium.registry", "helium," + HELIUM_PACKAGE_DEFAULT_URL), + ZEPPELIN_HELIUM_NODE_INSTALLER_URL("zeppelin.helium.node.installer.url", + "https://nodejs.org/dist/"), + ZEPPELIN_HELIUM_NPM_INSTALLER_URL("zeppelin.helium.npm.installer.url", + "http://registry.npmjs.org/"), + ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL("zeppelin.helium.yarnpkg.installer.url", + "https://github.com/yarnpkg/yarn/releases/download/"), + // Allows a way to specify a ',' separated list of allowed origins for rest and websockets + // i.e. http://localhost:8080 + ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), + ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), + ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true), + ZEPPELIN_CREDENTIALS_ENCRYPT_KEY("zeppelin.credentials.encryptKey", null), + ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"), + ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false), + ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"), + ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null), + ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"), + ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"), + + ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""), + ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""), + + ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"); + + private String varName; + @SuppressWarnings("rawtypes") + private Class varClass; + private String stringValue; + private VarType type; + private int intValue; + private float floatValue; + private boolean booleanValue; + private long longValue; + + + ConfVars(String varName, String varValue) { + this.varName = varName; + this.varClass = String.class; + this.stringValue = varValue; + this.intValue = -1; + this.floatValue = -1; + this.longValue = -1; + this.booleanValue = false; + this.type = VarType.STRING; + } + + ConfVars(String varName, int intValue) { + this.varName = varName; + this.varClass = Integer.class; + this.stringValue = null; + this.intValue = intValue; + this.floatValue = -1; + this.longValue = -1; + this.booleanValue = false; + this.type = VarType.INT; + } + + ConfVars(String varName, long longValue) { + this.varName = varName; + this.varClass = Integer.class; + this.stringValue = null; + this.intValue = -1; + this.floatValue = -1; + this.longValue = longValue; + this.booleanValue = false; + this.type = VarType.LONG; + } + + ConfVars(String varName, float floatValue) { + this.varName = varName; + this.varClass = Float.class; + this.stringValue = null; + this.intValue = -1; + this.longValue = -1; + this.floatValue = floatValue; + this.booleanValue = false; + this.type = VarType.FLOAT; + } + + ConfVars(String varName, boolean booleanValue) { + this.varName = varName; + this.varClass = Boolean.class; + this.stringValue = null; + this.intValue = -1; + this.longValue = -1; + this.floatValue = -1; + this.booleanValue = booleanValue; + this.type = VarType.BOOLEAN; + } + + public String getVarName() { + return varName; + } + + @SuppressWarnings("rawtypes") + public Class getVarClass() { + return varClass; + } + + public int getIntValue() { + return intValue; + } + + public long getLongValue() { + return longValue; + } + + public float getFloatValue() { + return floatValue; + } + + public String getStringValue() { + return stringValue; + } + + public boolean getBooleanValue() { + return booleanValue; + } + + public VarType getType() { + return type; + } + + enum VarType { + STRING { + @Override + void checkType(String value) throws Exception {} + }, + INT { + @Override + void checkType(String value) throws Exception { + Integer.valueOf(value); + } + }, + LONG { + @Override + void checkType(String value) throws Exception { + Long.valueOf(value); + } + }, + FLOAT { + @Override + void checkType(String value) throws Exception { + Float.valueOf(value); + } + }, + BOOLEAN { + @Override + void checkType(String value) throws Exception { + Boolean.valueOf(value); + } + }; + + boolean isType(String value) { + try { + checkType(value); + } catch (Exception e) { + LOG.error("Exception in ZeppelinConfiguration while isType", e); + return false; + } + return true; + } + + String typeString() { + return name().toUpperCase(); + } + + abstract void checkType(String value) throws Exception; + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java index 12376f0..9503962 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java @@ -168,7 +168,7 @@ public abstract class BaseZeppelinContext { interpreterContext.out.write(o.toString()); } } catch (IOException e) { - throw new InterpreterException(e); + throw new RuntimeException(e); } } @@ -229,14 +229,14 @@ public abstract class BaseZeppelinContext { public void run(String noteId, String paragraphId, InterpreterContext context, boolean checkCurrentParagraph) { if (paragraphId.equals(context.getParagraphId()) && checkCurrentParagraph) { - throw new InterpreterException("Can not run current Paragraph"); + throw new RuntimeException("Can not run current Paragraph"); } List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, paragraphId, context); if (runners.size() <= 0) { - throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size()); + throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size()); } for (InterpreterContextRunner r : runners) { @@ -255,7 +255,7 @@ public abstract class BaseZeppelinContext { List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context); if (runners.size() <= 0) { - throw new InterpreterException("Note " + noteId + " not found " + runners.size()); + throw new RuntimeException("Note " + noteId + " not found " + runners.size()); } for (InterpreterContextRunner r : runners) { @@ -346,12 +346,12 @@ public abstract class BaseZeppelinContext { boolean checkCurrentParagraph) { List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context); if (idx >= runners.size()) { - throw new InterpreterException("Index out of bound"); + throw new RuntimeException("Index out of bound"); } InterpreterContextRunner runner = runners.get(idx); if (runner.getParagraphId().equals(context.getParagraphId()) && checkCurrentParagraph) { - throw new InterpreterException("Can not run current Paragraph: " + runner.getParagraphId()); + throw new RuntimeException("Can not run current Paragraph: " + runner.getParagraphId()); } runner.run(); @@ -377,7 +377,7 @@ public abstract class BaseZeppelinContext { Integer idx = (Integer) idOrIdx; run(noteId, idx, context); } else { - throw new InterpreterException("Paragraph " + idOrIdx + " not found"); + throw new RuntimeException("Paragraph " + idOrIdx + " not found"); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java index a1dafd9..f8afa45 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java @@ -51,7 +51,8 @@ public class ClassloaderInterpreter } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { @@ -68,7 +69,7 @@ public class ClassloaderInterpreter @Override - public void open() { + public void open() throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { @@ -82,7 +83,7 @@ public class ClassloaderInterpreter } @Override - public void close() { + public void close() throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { @@ -96,7 +97,7 @@ public class ClassloaderInterpreter } @Override - public void cancel(InterpreterContext context) { + public void cancel(InterpreterContext context) throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { @@ -110,13 +111,11 @@ public class ClassloaderInterpreter } @Override - public FormType getFormType() { + public FormType getFormType() throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { return intp.getFormType(); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -124,7 +123,7 @@ public class ClassloaderInterpreter } @Override - public int getProgress(InterpreterContext context) { + public int getProgress(InterpreterContext context) throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { @@ -143,8 +142,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { return intp.getScheduler(); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -153,14 +150,12 @@ public class ClassloaderInterpreter @Override public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) throws InterpreterException { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { List completion = intp.completion(buf, cursor, interpreterContext); return completion; - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -174,8 +169,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { return intp.getClassName(); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -188,8 +181,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { intp.setInterpreterGroup(interpreterGroup); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -202,8 +193,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { return intp.getInterpreterGroup(); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -216,8 +205,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { intp.setClassloaderUrls(urls); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -230,8 +217,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { return intp.getClassloaderUrls(); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -239,13 +224,11 @@ public class ClassloaderInterpreter } @Override - public void setProperty(Properties property) { + public void setProperties(Properties properties) { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { - intp.setProperty(property); - } catch (Exception e) { - throw new InterpreterException(e); + intp.setProperties(properties); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -253,13 +236,11 @@ public class ClassloaderInterpreter } @Override - public Properties getProperty() { + public Properties getProperties() { ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(cl); try { - return intp.getProperty(); - } catch (Exception e) { - throw new InterpreterException(e); + return intp.getProperties(); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); @@ -272,8 +253,6 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { return intp.getProperty(key); - } catch (Exception e) { - throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(oldcl); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index 05599a0..386de41 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -31,6 +31,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.reflect.FieldUtils; import org.apache.zeppelin.annotation.Experimental; import org.apache.zeppelin.annotation.ZeppelinApi; +import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -55,20 +56,21 @@ public abstract class Interpreter { * open() is called only once */ @ZeppelinApi - public abstract void open(); + public abstract void open() throws InterpreterException; /** * Closes interpreter. You may want to free your resources up here. * close() is called only once */ @ZeppelinApi - public abstract void close(); + public abstract void close() throws InterpreterException; /** * Run precode if exists. */ @ZeppelinApi - public InterpreterResult executePrecode(InterpreterContext interpreterContext) { + public InterpreterResult executePrecode(InterpreterContext interpreterContext) + throws InterpreterException { String simpleName = this.getClass().getSimpleName(); String precode = getProperty(String.format("zeppelin.%s.precode", simpleName)); if (StringUtils.isNotBlank(precode)) { @@ -83,13 +85,15 @@ public abstract class Interpreter { * @param st statements to run */ @ZeppelinApi - public abstract InterpreterResult interpret(String st, InterpreterContext context); + public abstract InterpreterResult interpret(String st, + InterpreterContext context) + throws InterpreterException; /** * Optionally implement the canceling routine to abort interpret() method */ @ZeppelinApi - public abstract void cancel(InterpreterContext context); + public abstract void cancel(InterpreterContext context) throws InterpreterException; /** * Dynamic form handling @@ -99,7 +103,7 @@ public abstract class Interpreter { * FormType.NATIVE handles form in API */ @ZeppelinApi - public abstract FormType getFormType(); + public abstract FormType getFormType() throws InterpreterException; /** * get interpret() method running process in percentage. @@ -107,7 +111,7 @@ public abstract class Interpreter { * @return number between 0-100 */ @ZeppelinApi - public abstract int getProgress(InterpreterContext context); + public abstract int getProgress(InterpreterContext context) throws InterpreterException; /** * Get completion list based on cursor position. @@ -120,7 +124,7 @@ public abstract class Interpreter { */ @ZeppelinApi public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) throws InterpreterException { return null; } @@ -144,22 +148,22 @@ public abstract class Interpreter { public static Logger logger = LoggerFactory.getLogger(Interpreter.class); private InterpreterGroup interpreterGroup; private URL[] classloaderUrls; - protected Properties property; - private String userName; + protected Properties properties; + protected String userName; @ZeppelinApi - public Interpreter(Properties property) { - this.property = property; + public Interpreter(Properties properties) { + this.properties = properties; } - public void setProperty(Properties property) { - this.property = property; + public void setProperties(Properties properties) { + this.properties = properties; } @ZeppelinApi - public Properties getProperty() { + public Properties getProperties() { Properties p = new Properties(); - p.putAll(property); + p.putAll(properties); RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName( getClassName()); @@ -183,11 +187,22 @@ public abstract class Interpreter { @ZeppelinApi public String getProperty(String key) { - logger.debug("key: {}, value: {}", key, getProperty().getProperty(key)); + logger.debug("key: {}, value: {}", key, getProperties().getProperty(key)); - return getProperty().getProperty(key); + return getProperties().getProperty(key); } + @ZeppelinApi + public String getProperty(String key, String defaultValue) { + logger.debug("key: {}, value: {}", key, getProperties().getProperty(key, defaultValue)); + + return getProperties().getProperty(key, defaultValue); + } + + @ZeppelinApi + public void setProperty(String key, String value) { + properties.setProperty(key, value); + } public String getClassName() { return this.getClass().getName(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java index ebd184e..8b8a229 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterException.java @@ -17,11 +17,12 @@ package org.apache.zeppelin.interpreter; + /** * Runtime Exception for interpreters. * */ -public class InterpreterException extends RuntimeException { +public class InterpreterException extends Exception { public InterpreterException(Throwable e) { super(e); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java index 96f88ee..8e55c31 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java @@ -44,13 +44,13 @@ public class LazyOpenInterpreter } @Override - public void setProperty(Properties property) { - intp.setProperty(property); + public void setProperties(Properties properties) { + intp.setProperties(properties); } @Override - public Properties getProperty() { - return intp.getProperty(); + public Properties getProperties() { + return intp.getProperties(); } @Override @@ -59,7 +59,7 @@ public class LazyOpenInterpreter } @Override - public synchronized void open() { + public synchronized void open() throws InterpreterException { if (opened == true) { return; } @@ -73,12 +73,13 @@ public class LazyOpenInterpreter } @Override - public InterpreterResult executePrecode(InterpreterContext interpreterContext) { + public InterpreterResult executePrecode(InterpreterContext interpreterContext) + throws InterpreterException { return intp.executePrecode(interpreterContext); } @Override - public void close() { + public void close() throws InterpreterException { synchronized (intp) { if (opened == true) { intp.close(); @@ -94,7 +95,8 @@ public class LazyOpenInterpreter } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { + public InterpreterResult interpret(String st, InterpreterContext context) + throws InterpreterException { open(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { @@ -105,18 +107,18 @@ public class LazyOpenInterpreter } @Override - public void cancel(InterpreterContext context) { + public void cancel(InterpreterContext context) throws InterpreterException { open(); intp.cancel(context); } @Override - public FormType getFormType() { + public FormType getFormType() throws InterpreterException { return intp.getFormType(); } @Override - public int getProgress(InterpreterContext context) { + public int getProgress(InterpreterContext context) throws InterpreterException { if (opened) { return intp.getProgress(context); } else { @@ -131,7 +133,7 @@ public class LazyOpenInterpreter @Override public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { + InterpreterContext interpreterContext) throws InterpreterException { open(); List completion = intp.completion(buf, cursor, interpreterContext); return completion; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java new file mode 100644 index 0000000..b991079 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +/** + * Interface to InterpreterClient which is created by InterpreterLauncher. This is the component + * that is used to for the communication fromzeppelin-server process to zeppelin interpreter process + */ +public interface InterpreterClient { + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java new file mode 100644 index 0000000..db8f8dd --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterRunner; + +import java.util.Properties; + +/** + * Context class for Interpreter Launch + */ +public class InterpreterLaunchContext { + + private Properties properties; + private InterpreterOption option; + private InterpreterRunner runner; + private String interpreterGroupId; + private String interpreterGroupName; + + public InterpreterLaunchContext(Properties properties, + InterpreterOption option, + InterpreterRunner runner, + String interpreterGroupId, + String interpreterGroupName) { + this.properties = properties; + this.option = option; + this.runner = runner; + this.interpreterGroupId = interpreterGroupId; + this.interpreterGroupName = interpreterGroupName; + } + + public Properties getProperties() { + return properties; + } + + public InterpreterOption getOption() { + return option; + } + + public InterpreterRunner getRunner() { + return runner; + } + + public String getInterpreterGroupId() { + return interpreterGroupId; + } + + public String getInterpreterGroupName() { + return interpreterGroupName; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java new file mode 100644 index 0000000..5d0acf3 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; + +import java.io.IOException; +import java.util.Properties; + +/** + * Component to Launch interpreter process. + */ +public abstract class InterpreterLauncher { + + protected ZeppelinConfiguration zConf; + protected Properties properties; + + public InterpreterLauncher(ZeppelinConfiguration zConf) { + this.zConf = zConf; + } + + public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException; +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java index 8d16ec5..74b8db6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java @@ -33,6 +33,6 @@ public class RemoteInterpreterContextRunner extends InterpreterContextRunner { public void run() { // this class should be used only for gson deserialize abstract class // code should not reach here - throw new InterpreterException("Assert"); + throw new RuntimeException("Assert"); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 7f476e8..cb0488c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -207,7 +207,11 @@ public class RemoteInterpreterServer if (interpreterGroup != null) { for (List<Interpreter> session : interpreterGroup.values()) { for (Interpreter interpreter : session) { - interpreter.close(); + try { + interpreter.close(); + } catch (InterpreterException e) { + logger.warn("Fail to close interpreter", e); + } } } } @@ -356,7 +360,11 @@ public class RemoteInterpreterServer public void open(String sessionId, String className) throws TException { logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId)); Interpreter intp = getInterpreter(sessionId, className); - intp.open(); + try { + intp.open(); + } catch (InterpreterException e) { + throw new TException("Fail to open interpreter", e); + } } @Override @@ -388,7 +396,11 @@ public class RemoteInterpreterServer while (it.hasNext()) { Interpreter inp = it.next(); if (inp.getClassName().equals(className)) { - inp.close(); + try { + inp.close(); + } catch (InterpreterException e) { + logger.warn("Fail to close interpreter", e); + } it.remove(); break; } @@ -655,7 +667,11 @@ public class RemoteInterpreterServer if (job != null) { job.setStatus(Status.ABORT); } else { - intp.cancel(convert(interpreterContext, null)); + try { + intp.cancel(convert(interpreterContext, null)); + } catch (InterpreterException e) { + throw new TException("Fail to cancel", e); + } } } @@ -672,7 +688,11 @@ public class RemoteInterpreterServer throw new TException("No interpreter {} existed for session {}".format( className, sessionId)); } - return intp.getProgress(convert(interpreterContext, null)); + try { + return intp.getProgress(convert(interpreterContext, null)); + } catch (InterpreterException e) { + throw new TException("Fail to getProgress", e); + } } } @@ -680,7 +700,11 @@ public class RemoteInterpreterServer @Override public String getFormType(String sessionId, String className) throws TException { Interpreter intp = getInterpreter(sessionId, className); - return intp.getFormType().toString(); + try { + return intp.getFormType().toString(); + } catch (InterpreterException e) { + throw new TException(e); + } } @Override @@ -688,8 +712,11 @@ public class RemoteInterpreterServer String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext) throws TException { Interpreter intp = getInterpreter(sessionId, className); - List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null)); - return completion; + try { + return intp.completion(buf, cursor, convert(remoteInterpreterContext, null)); + } catch (InterpreterException e) { + throw new TException("Fail to get completion", e); + } } private InterpreterContext convert(RemoteInterpreterContext ric) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java index 31c9225..d341b58 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java @@ -33,8 +33,8 @@ public class InterpreterTest { p.put("p1", "v1"); Interpreter intp = new DummyInterpreter(p); - assertEquals(1, intp.getProperty().size()); - assertEquals("v1", intp.getProperty().get("p1")); + assertEquals(1, intp.getProperties().size()); + assertEquals("v1", intp.getProperties().get("p1")); assertEquals("v1", intp.getProperty("p1")); } @@ -45,10 +45,10 @@ public class InterpreterTest { Interpreter intp = new DummyInterpreter(p); Properties overriddenProperty = new Properties(); overriddenProperty.put("p1", "v2"); - intp.setProperty(overriddenProperty); + intp.setProperties(overriddenProperty); - assertEquals(1, intp.getProperty().size()); - assertEquals("v2", intp.getProperty().get("p1")); + assertEquals(1, intp.getProperties().size()); + assertEquals("v2", intp.getProperties().get("p1")); assertEquals("v2", intp.getProperty("p1")); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java index 26e835f..8e325f2 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java @@ -28,7 +28,7 @@ public class LazyOpenInterpreterTest { Interpreter interpreter = mock(Interpreter.class); @Test - public void isOpenTest() { + public void isOpenTest() throws InterpreterException { InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); when(interpreter.interpret(any(String.class), any(InterpreterContext.class))).thenReturn(interpreterResult);