http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index c1dba5c..e2a10e6 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -123,7 +123,7 @@ public class InterpreterRestApi { request.getOption(), request.getProperties()); logger.info("new setting created with {}", interpreterSetting.getId()); return new JsonResponse<>(Status.OK, "", interpreterSetting).build(); - } catch (InterpreterException | IOException e) { + } catch (IOException e) { logger.error("Exception in InterpreterRestApi while creating ", e); return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e)) .build();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 3e46449..2fa584b 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; @@ -171,7 +172,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { } @Test - public void sparkRTest() throws IOException { + public void sparkRTest() throws IOException, InterpreterException { // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); int sparkVersion = getSparkVersionNumber(note); @@ -426,7 +427,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { } @Test - public void pySparkDepLoaderTest() throws IOException { + public void pySparkDepLoaderTest() throws IOException, InterpreterException { // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); int sparkVersionNumber = getSparkVersionNumber(note); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index c67df6b..d1a2270 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -71,11 +71,6 @@ </dependency> <dependency> - <groupId>commons-configuration</groupId> - <artifactId>commons-configuration</artifactId> - </dependency> - - <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java deleted file mode 100644 index 3a82bc5..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ /dev/null @@ -1,847 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.conf; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.commons.configuration.tree.ConfigurationNode; -import org.apache.commons.lang.StringUtils; -import org.apache.zeppelin.util.Util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.net.URL; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Zeppelin configuration. - * - */ -public class ZeppelinConfiguration extends XMLConfiguration { - private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml"; - private static final long serialVersionUID = 4749305895693848035L; - private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class); - - private static final String HELIUM_PACKAGE_DEFAULT_URL = - "https://s3.amazonaws.com/helium-package/helium.json"; - private static ZeppelinConfiguration conf; - - public ZeppelinConfiguration(URL url) throws ConfigurationException { - setDelimiterParsingDisabled(true); - load(url); - } - - public ZeppelinConfiguration() { - ConfVars[] vars = ConfVars.values(); - for (ConfVars v : vars) { - if (v.getType() == ConfVars.VarType.BOOLEAN) { - this.setProperty(v.getVarName(), v.getBooleanValue()); - } else if (v.getType() == ConfVars.VarType.LONG) { - this.setProperty(v.getVarName(), v.getLongValue()); - } else if (v.getType() == ConfVars.VarType.INT) { - this.setProperty(v.getVarName(), v.getIntValue()); - } else if (v.getType() == ConfVars.VarType.FLOAT) { - this.setProperty(v.getVarName(), v.getFloatValue()); - } else if (v.getType() == ConfVars.VarType.STRING) { - this.setProperty(v.getVarName(), v.getStringValue()); - } else { - throw new RuntimeException("Unsupported VarType"); - } - } - - } - - - /** - * Load from resource. - *url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML); - * @throws ConfigurationException - */ - public static synchronized ZeppelinConfiguration create() { - if (conf != null) { - return conf; - } - - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - URL url; - - url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML); - if (url == null) { - ClassLoader cl = ZeppelinConfiguration.class.getClassLoader(); - if (cl != null) { - url = cl.getResource(ZEPPELIN_SITE_XML); - } - } - if (url == null) { - url = classLoader.getResource(ZEPPELIN_SITE_XML); - } - - if (url == null) { - LOG.warn("Failed to load configuration, proceeding with a default"); - conf = new ZeppelinConfiguration(); - } else { - try { - LOG.info("Load configuration from " + url); - conf = new ZeppelinConfiguration(url); - } catch (ConfigurationException e) { - LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e); - conf = new ZeppelinConfiguration(); - } - } - - LOG.info("Server Host: " + conf.getServerAddress()); - if (conf.useSsl() == false) { - LOG.info("Server Port: " + conf.getServerPort()); - } else { - LOG.info("Server SSL Port: " + conf.getServerSslPort()); - } - LOG.info("Context Path: " + conf.getServerContextPath()); - LOG.info("Zeppelin Version: " + Util.getVersion()); - - return conf; - } - - - private String getStringValue(String name, String d) { - List<ConfigurationNode> properties = getRootNode().getChildren(); - if (properties == null || properties.isEmpty()) { - return d; - } - for (ConfigurationNode p : properties) { - if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() - && name.equals(p.getChildren("name").get(0).getValue())) { - return (String) p.getChildren("value").get(0).getValue(); - } - } - return d; - } - - private int getIntValue(String name, int d) { - List<ConfigurationNode> properties = getRootNode().getChildren(); - if (properties == null || properties.isEmpty()) { - return d; - } - for (ConfigurationNode p : properties) { - if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() - && name.equals(p.getChildren("name").get(0).getValue())) { - return Integer.parseInt((String) p.getChildren("value").get(0).getValue()); - } - } - return d; - } - - private long getLongValue(String name, long d) { - List<ConfigurationNode> properties = getRootNode().getChildren(); - if (properties == null || properties.isEmpty()) { - return d; - } - for (ConfigurationNode p : properties) { - if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() - && name.equals(p.getChildren("name").get(0).getValue())) { - return Long.parseLong((String) p.getChildren("value").get(0).getValue()); - } - } - return d; - } - - private float getFloatValue(String name, float d) { - List<ConfigurationNode> properties = getRootNode().getChildren(); - if (properties == null || properties.isEmpty()) { - return d; - } - for (ConfigurationNode p : properties) { - if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() - && name.equals(p.getChildren("name").get(0).getValue())) { - return Float.parseFloat((String) p.getChildren("value").get(0).getValue()); - } - } - return d; - } - - private boolean getBooleanValue(String name, boolean d) { - List<ConfigurationNode> properties = getRootNode().getChildren(); - if (properties == null || properties.isEmpty()) { - return d; - } - for (ConfigurationNode p : properties) { - if (p.getChildren("name") != null && !p.getChildren("name").isEmpty() - && name.equals(p.getChildren("name").get(0).getValue())) { - return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue()); - } - } - return d; - } - - public String getString(ConfVars c) { - return getString(c.name(), c.getVarName(), c.getStringValue()); - } - - public String getString(String envName, String propertyName, String defaultValue) { - if (System.getenv(envName) != null) { - return System.getenv(envName); - } - if (System.getProperty(propertyName) != null) { - return System.getProperty(propertyName); - } - - return getStringValue(propertyName, defaultValue); - } - - public int getInt(ConfVars c) { - return getInt(c.name(), c.getVarName(), c.getIntValue()); - } - - public int getInt(String envName, String propertyName, int defaultValue) { - if (System.getenv(envName) != null) { - return Integer.parseInt(System.getenv(envName)); - } - - if (System.getProperty(propertyName) != null) { - return Integer.parseInt(System.getProperty(propertyName)); - } - return getIntValue(propertyName, defaultValue); - } - - public long getLong(ConfVars c) { - return getLong(c.name(), c.getVarName(), c.getLongValue()); - } - - public long getLong(String envName, String propertyName, long defaultValue) { - if (System.getenv(envName) != null) { - return Long.parseLong(System.getenv(envName)); - } - - if (System.getProperty(propertyName) != null) { - return Long.parseLong(System.getProperty(propertyName)); - } - return getLongValue(propertyName, defaultValue); - } - - public float getFloat(ConfVars c) { - return getFloat(c.name(), c.getVarName(), c.getFloatValue()); - } - - public float getFloat(String envName, String propertyName, float defaultValue) { - if (System.getenv(envName) != null) { - return Float.parseFloat(System.getenv(envName)); - } - if (System.getProperty(propertyName) != null) { - return Float.parseFloat(System.getProperty(propertyName)); - } - return getFloatValue(propertyName, defaultValue); - } - - public boolean getBoolean(ConfVars c) { - return getBoolean(c.name(), c.getVarName(), c.getBooleanValue()); - } - - public boolean getBoolean(String envName, String propertyName, boolean defaultValue) { - if (System.getenv(envName) != null) { - return Boolean.parseBoolean(System.getenv(envName)); - } - - if (System.getProperty(propertyName) != null) { - return Boolean.parseBoolean(System.getProperty(propertyName)); - } - return getBooleanValue(propertyName, defaultValue); - } - - public boolean useSsl() { - return getBoolean(ConfVars.ZEPPELIN_SSL); - } - - public int getServerSslPort() { - return getInt(ConfVars.ZEPPELIN_SSL_PORT); - } - - public boolean useClientAuth() { - return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH); - } - - public String getServerAddress() { - return getString(ConfVars.ZEPPELIN_ADDR); - } - - public int getServerPort() { - return getInt(ConfVars.ZEPPELIN_PORT); - } - - public String getServerContextPath() { - return getString(ConfVars.ZEPPELIN_SERVER_CONTEXT_PATH); - } - - public String getKeyStorePath() { - String path = getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH); - if (path != null && path.startsWith("/") || isWindowsPath(path)) { - return path; - } else { - return getRelativeDir( - String.format("%s/%s", - getConfDir(), - path)); - } - } - - public String getKeyStoreType() { - return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE); - } - - public String getKeyStorePassword() { - return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD); - } - - public String getKeyManagerPassword() { - String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD); - if (password == null) { - return getKeyStorePassword(); - } else { - return password; - } - } - - public String getTrustStorePath() { - String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH); - if (path == null) { - path = getKeyStorePath(); - } - if (path != null && path.startsWith("/") || isWindowsPath(path)) { - return path; - } else { - return getRelativeDir( - String.format("%s/%s", - getConfDir(), - path)); - } - } - - public String getTrustStoreType() { - String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE); - if (type == null) { - return getKeyStoreType(); - } else { - return type; - } - } - - public String getTrustStorePassword() { - String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD); - if (password == null) { - return getKeyStorePassword(); - } else { - return password; - } - } - - public String getNotebookDir() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR); - } - - public String getUser() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER); - } - - public String getBucketName() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_BUCKET); - } - - public String getEndpoint() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_ENDPOINT); - } - - public String getS3KMSKeyID() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID); - } - - public String getS3KMSKeyRegion() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION); - } - - public String getS3EncryptionMaterialsProviderClass() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_EMP); - } - - public boolean isS3ServerSideEncryption() { - return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_S3_SSE); - } - - public String getMongoUri() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI); - } - - public String getMongoDatabase() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_DATABASE); - } - - public String getMongoCollection() { - return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION); - } - - public boolean getMongoAutoimport() { - return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT); - } - - public String getInterpreterListPath() { - return getRelativeDir(String.format("%s/interpreter-list", getConfDir())); - } - - public String getInterpreterDir() { - return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR); - } - - public String getInterpreterJson() { - return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON); - } - - public String getInterpreterSettingPath() { - return getRelativeDir(String.format("%s/interpreter.json", getConfDir())); - } - - public String getHeliumConfPath() { - return getRelativeDir(String.format("%s/helium.json", getConfDir())); - } - - public String getHeliumRegistry() { - return getRelativeDir(ConfVars.ZEPPELIN_HELIUM_REGISTRY); - } - - public String getHeliumNodeInstallerUrl() { - return getString(ConfVars.ZEPPELIN_HELIUM_NODE_INSTALLER_URL); - } - - public String getHeliumNpmInstallerUrl() { - return getString(ConfVars.ZEPPELIN_HELIUM_NPM_INSTALLER_URL); - } - - public String getHeliumYarnInstallerUrl() { - return getString(ConfVars.ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL); - } - - public String getNotebookAuthorizationPath() { - return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir())); - } - - public Boolean credentialsPersist() { - return getBoolean(ConfVars.ZEPPELIN_CREDENTIALS_PERSIST); - } - - public String getCredentialsEncryptKey() { - return getString(ConfVars.ZEPPELIN_CREDENTIALS_ENCRYPT_KEY); - } - - public String getCredentialsPath() { - return getRelativeDir(String.format("%s/credentials.json", getConfDir())); - } - - public String getShiroPath() { - String shiroPath = getRelativeDir(String.format("%s/shiro.ini", getConfDir())); - return new File(shiroPath).exists() ? shiroPath : StringUtils.EMPTY; - } - - public String getInterpreterRemoteRunnerPath() { - return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER); - } - - public String getInterpreterLocalRepoPath() { - return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO); - } - - public String getInterpreterMvnRepoPath() { - return getString(ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO); - } - - public String getRelativeDir(ConfVars c) { - return getRelativeDir(getString(c)); - } - - public String getRelativeDir(String path) { - if (path != null && path.startsWith("/") || isWindowsPath(path)) { - return path; - } else { - return getString(ConfVars.ZEPPELIN_HOME) + "/" + path; - } - } - - public String getCallbackPortRange() { - return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE); - } - - public boolean isWindowsPath(String path){ - return path.matches("^[A-Za-z]:\\\\.*"); - } - - public boolean isAnonymousAllowed() { - return getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED); - } - - public boolean isNotebokPublic() { - return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC); - } - - public String getConfDir() { - return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR); - } - - public List<String> getAllowedOrigins() - { - if (getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).isEmpty()) { - return Arrays.asList(new String[0]); - } - - return Arrays.asList(getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).toLowerCase().split(",")); - } - - public String getWebsocketMaxTextMessageSize() { - return getString(ConfVars.ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE); - } - - public String getJettyName() { - return getString(ConfVars.ZEPPELIN_SERVER_JETTY_NAME); - } - - - public String getXFrameOptions() { - return getString(ConfVars.ZEPPELIN_SERVER_XFRAME_OPTIONS); - } - - public String getXxssProtection() { - return getString(ConfVars.ZEPPELIN_SERVER_X_XSS_PROTECTION); - } - - public String getStrictTransport() { - return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT); - } - - - public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf, - ConfigurationKeyPredicate predicate) { - Map<String, String> configurations = new HashMap<>(); - - for (ConfVars v : ConfVars.values()) { - String key = v.getVarName(); - - if (!predicate.apply(key)) { - continue; - } - - ConfVars.VarType type = v.getType(); - Object value = null; - if (type == ConfVars.VarType.BOOLEAN) { - value = conf.getBoolean(v); - } else if (type == ConfVars.VarType.LONG) { - value = conf.getLong(v); - } else if (type == ConfVars.VarType.INT) { - value = conf.getInt(v); - } else if (type == ConfVars.VarType.FLOAT) { - value = conf.getFloat(v); - } else if (type == ConfVars.VarType.STRING) { - value = conf.getString(v); - } - - if (value != null) { - configurations.put(key, value.toString()); - } - } - return configurations; - } - - /** - * Predication whether key/value pair should be included or not - */ - public interface ConfigurationKeyPredicate { - boolean apply(String key); - } - - /** - * Wrapper class. - */ - public static enum ConfVars { - ZEPPELIN_HOME("zeppelin.home", "./"), - ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"), - ZEPPELIN_PORT("zeppelin.server.port", 8080), - ZEPPELIN_SERVER_CONTEXT_PATH("zeppelin.server.context.path", "/"), - ZEPPELIN_SSL("zeppelin.ssl", false), - ZEPPELIN_SSL_PORT("zeppelin.server.ssl.port", 8443), - ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false), - ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"), - ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"), - ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""), - ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null), - ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null), - ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null), - ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null), - ZEPPELIN_WAR("zeppelin.war", "zeppelin-web/dist"), - ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"), - ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter," - + "org.apache.zeppelin.spark.PySparkInterpreter," - + "org.apache.zeppelin.rinterpreter.RRepl," - + "org.apache.zeppelin.rinterpreter.KnitR," - + "org.apache.zeppelin.spark.SparkRInterpreter," - + "org.apache.zeppelin.spark.SparkSqlInterpreter," - + "org.apache.zeppelin.spark.DepInterpreter," - + "org.apache.zeppelin.markdown.Markdown," - + "org.apache.zeppelin.angular.AngularInterpreter," - + "org.apache.zeppelin.shell.ShellInterpreter," - + "org.apache.zeppelin.livy.LivySparkInterpreter," - + "org.apache.zeppelin.livy.LivySparkSQLInterpreter," - + "org.apache.zeppelin.livy.LivyPySparkInterpreter," - + "org.apache.zeppelin.livy.LivyPySpark3Interpreter," - + "org.apache.zeppelin.livy.LivySparkRInterpreter," - + "org.apache.zeppelin.alluxio.AlluxioInterpreter," - + "org.apache.zeppelin.file.HDFSFileInterpreter," - + "org.apache.zeppelin.pig.PigInterpreter," - + "org.apache.zeppelin.pig.PigQueryInterpreter," - + "org.apache.zeppelin.flink.FlinkInterpreter," - + "org.apache.zeppelin.python.PythonInterpreter," - + "org.apache.zeppelin.python.PythonInterpreterPandasSql," - + "org.apache.zeppelin.python.PythonCondaInterpreter," - + "org.apache.zeppelin.python.PythonDockerInterpreter," - + "org.apache.zeppelin.ignite.IgniteInterpreter," - + "org.apache.zeppelin.ignite.IgniteSqlInterpreter," - + "org.apache.zeppelin.lens.LensInterpreter," - + "org.apache.zeppelin.cassandra.CassandraInterpreter," - + "org.apache.zeppelin.geode.GeodeOqlInterpreter," - + "org.apache.zeppelin.kylin.KylinInterpreter," - + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter," - + "org.apache.zeppelin.scalding.ScaldingInterpreter," - + "org.apache.zeppelin.jdbc.JDBCInterpreter," - + "org.apache.zeppelin.hbase.HbaseInterpreter," - + "org.apache.zeppelin.bigquery.BigQueryInterpreter," - + "org.apache.zeppelin.beam.BeamInterpreter," - + "org.apache.zeppelin.scio.ScioInterpreter," - + "org.apache.zeppelin.groovy.GroovyInterpreter," - + "org.apache.zeppelin.neo4j.Neo4jCypherInterpreter" - ), - ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), - ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), - ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), - ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo", - "http://repo1.maven.org/maven2/"), - ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), - ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), - ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," - + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," - + "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy,neo4j"), - ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100), - ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), - ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), - // use specified notebook (id) as homescreen - ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null), - // whether homescreen notebook will be hidden from notebook list or not - ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", false), - ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"), - ZEPPELIN_NOTEBOOK_S3_ENDPOINT("zeppelin.notebook.s3.endpoint", "s3.amazonaws.com"), - ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"), - ZEPPELIN_NOTEBOOK_S3_EMP("zeppelin.notebook.s3.encryptionMaterialsProvider", null), - ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID("zeppelin.notebook.s3.kmsKeyID", null), - ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION("zeppelin.notebook.s3.kmsKeyRegion", null), - ZEPPELIN_NOTEBOOK_S3_SSE("zeppelin.notebook.s3.sse", false), - ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null), - ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"), - ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"), - ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", "zeppelin"), - ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"), - ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"), - ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false), - ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", - "org.apache.zeppelin.notebook.repo.GitNotebookRepo"), - ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false), - // whether by default note is public or private - ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true), - ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", - System.getProperty("os.name") - .startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"), - // Decide when new note is created, interpreter settings will be binded automatically or not. - ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true), - ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"), - ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"), - ZEPPELIN_HELIUM_REGISTRY("zeppelin.helium.registry", "helium," + HELIUM_PACKAGE_DEFAULT_URL), - ZEPPELIN_HELIUM_NODE_INSTALLER_URL("zeppelin.helium.node.installer.url", - "https://nodejs.org/dist/"), - ZEPPELIN_HELIUM_NPM_INSTALLER_URL("zeppelin.helium.npm.installer.url", - "http://registry.npmjs.org/"), - ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL("zeppelin.helium.yarnpkg.installer.url", - "https://github.com/yarnpkg/yarn/releases/download/"), - // Allows a way to specify a ',' separated list of allowed origins for rest and websockets - // i.e. http://localhost:8080 - ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), - ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), - ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true), - ZEPPELIN_CREDENTIALS_ENCRYPT_KEY("zeppelin.credentials.encryptKey", null), - ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"), - ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false), - ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"), - ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null), - ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"), - ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"), - - ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""), - ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""), - - ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"); - - private String varName; - @SuppressWarnings("rawtypes") - private Class varClass; - private String stringValue; - private VarType type; - private int intValue; - private float floatValue; - private boolean booleanValue; - private long longValue; - - - ConfVars(String varName, String varValue) { - this.varName = varName; - this.varClass = String.class; - this.stringValue = varValue; - this.intValue = -1; - this.floatValue = -1; - this.longValue = -1; - this.booleanValue = false; - this.type = VarType.STRING; - } - - ConfVars(String varName, int intValue) { - this.varName = varName; - this.varClass = Integer.class; - this.stringValue = null; - this.intValue = intValue; - this.floatValue = -1; - this.longValue = -1; - this.booleanValue = false; - this.type = VarType.INT; - } - - ConfVars(String varName, long longValue) { - this.varName = varName; - this.varClass = Integer.class; - this.stringValue = null; - this.intValue = -1; - this.floatValue = -1; - this.longValue = longValue; - this.booleanValue = false; - this.type = VarType.LONG; - } - - ConfVars(String varName, float floatValue) { - this.varName = varName; - this.varClass = Float.class; - this.stringValue = null; - this.intValue = -1; - this.longValue = -1; - this.floatValue = floatValue; - this.booleanValue = false; - this.type = VarType.FLOAT; - } - - ConfVars(String varName, boolean booleanValue) { - this.varName = varName; - this.varClass = Boolean.class; - this.stringValue = null; - this.intValue = -1; - this.longValue = -1; - this.floatValue = -1; - this.booleanValue = booleanValue; - this.type = VarType.BOOLEAN; - } - - public String getVarName() { - return varName; - } - - @SuppressWarnings("rawtypes") - public Class getVarClass() { - return varClass; - } - - public int getIntValue() { - return intValue; - } - - public long getLongValue() { - return longValue; - } - - public float getFloatValue() { - return floatValue; - } - - public String getStringValue() { - return stringValue; - } - - public boolean getBooleanValue() { - return booleanValue; - } - - public VarType getType() { - return type; - } - - enum VarType { - STRING { - @Override - void checkType(String value) throws Exception {} - }, - INT { - @Override - void checkType(String value) throws Exception { - Integer.valueOf(value); - } - }, - LONG { - @Override - void checkType(String value) throws Exception { - Long.valueOf(value); - } - }, - FLOAT { - @Override - void checkType(String value) throws Exception { - Float.valueOf(value); - } - }, - BOOLEAN { - @Override - void checkType(String value) throws Exception { - Boolean.valueOf(value); - } - }; - - boolean isType(String value) { - try { - checkType(value); - } catch (Exception e) { - LOG.error("Exception in ZeppelinConfiguration while isType", e); - return false; - } - return true; - } - - String typeString() { - return name().toUpperCase(); - } - - abstract void checkType(String value) throws Exception; - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index f020919..7233239 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -77,7 +77,7 @@ public class InterpreterFactory { return interpreter; } } - throw new InterpreterException(replName + " interpreter not found"); + return null; } else { // first assume replName is 'name' of interpreter. ('groupName' is ommitted) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 5af01dc..a82d5bf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.interpreter; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.gson.JsonArray; @@ -34,19 +33,22 @@ import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext; +import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; +import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher; +import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; -import java.io.FilenameFilter; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URL; @@ -58,7 +60,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -132,6 +133,10 @@ public class InterpreterSetting { private transient ZeppelinConfiguration conf = new ZeppelinConfiguration(); + // TODO(zjffdu) ShellScriptLauncher is the only launcher implemention for now. It could be other + // launcher in future when we have other launcher implementation. e.g. third party launcher + // service like livy + private transient InterpreterLauncher launcher; /////////////////////////////////////////////////////////////////////////////////////////// @@ -243,6 +248,7 @@ public class InterpreterSetting { } void postProcessing() { +// createLauncher(); this.status = Status.READY; } @@ -266,6 +272,14 @@ public class InterpreterSetting { this.conf = o.getConf(); } + private void createLauncher() { + if (group.equals("spark")) { + this.launcher = new SparkInterpreterLauncher(this.conf); + } else { + this.launcher = new ShellScriptLauncher(this.conf); + } + } + public AngularObjectRegistryListener getAngularObjectRegistryListener() { return angularObjectRegistryListener; } @@ -626,152 +640,17 @@ public class InterpreterSetting { } return interpreters; } - - RemoteInterpreterProcess createInterpreterProcess() { - RemoteInterpreterProcess remoteInterpreterProcess = null; - int connectTimeout = - conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id; - if (option.isExistingProcess()) { - // TODO(zjffdu) remove the existing process approach seems no one is using this. - // use the existing process - remoteInterpreterProcess = new RemoteInterpreterRunningProcess( - connectTimeout, - remoteInterpreterProcessListener, - appEventListener, - option.getHost(), - option.getPort()); - } else { - // create new remote process - remoteInterpreterProcess = new RemoteInterpreterManagedProcess( - interpreterRunner != null ? interpreterRunner.getPath() : - conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(), - interpreterDir, localRepoPath, - getEnvFromInterpreterProperty(), connectTimeout, - remoteInterpreterProcessListener, appEventListener, group); - } - return remoteInterpreterProcess; - } - - private boolean isSparkConf(String key, String value) { - return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); - } - - private Map<String, String> getEnvFromInterpreterProperty() { - Map<String, String> env = new HashMap<String, String>(); - Properties javaProperties = getJavaProperties(); - Properties sparkProperties = new Properties(); - String sparkMaster = getSparkMaster(); - for (String key : javaProperties.stringPropertyNames()) { - if (RemoteInterpreterUtils.isEnvString(key)) { - env.put(key, javaProperties.getProperty(key)); - } - if (isSparkConf(key, javaProperties.getProperty(key))) { - sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key))); - } - } - - setupPropertiesForPySpark(sparkProperties); - setupPropertiesForSparkR(sparkProperties, System.getenv("SPARK_HOME")); - if (isYarnMode() && getDeployMode().equals("cluster")) { - env.put("SPARK_YARN_CLUSTER", "true"); - } - - StringBuilder sparkConfBuilder = new StringBuilder(); - if (sparkMaster != null) { - sparkConfBuilder.append(" --master " + sparkMaster); - } - if (isYarnMode() && getDeployMode().equals("cluster")) { - sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties"); - } - for (String name : sparkProperties.stringPropertyNames()) { - sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); - } - - env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); - LOGGER.debug("getEnvFromInterpreterProperty: " + env); - return env; - } - - private void setupPropertiesForPySpark(Properties sparkProperties) { - if (isYarnMode()) { - sparkProperties.setProperty("spark.yarn.isPython", "true"); - } - } - private void mergeSparkProperty(Properties sparkProperties, String propertyName, - String propertyValue) { - if (sparkProperties.containsKey(propertyName)) { - String oldPropertyValue = sparkProperties.getProperty(propertyName); - sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue); - } else { - sparkProperties.setProperty(propertyName, propertyValue); - } - } - - private void setupPropertiesForSparkR(Properties sparkProperties, - String sparkHome) { - File sparkRBasePath = null; - if (sparkHome == null) { - if (!getSparkMaster().startsWith("local")) { - throw new RuntimeException("SPARK_HOME is not specified for non-local mode"); - } - String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); - sparkRBasePath = new File(zeppelinHome, - "interpreter" + File.separator + "spark" + File.separator + "R"); - } else { - sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); - } - - File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); - if (sparkRPath.exists() && sparkRPath.isFile()) { - mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath()); - } else { - LOGGER.warn("sparkr.zip is not found, SparkR may not work."); - } - } - - private String getSparkMaster() { - String master = getJavaProperties().getProperty("master"); - if (master == null) { - master = getJavaProperties().getProperty("spark.master", "local[*]"); - } - return master; - } - - private String getDeployMode() { - String master = getSparkMaster(); - if (master.equals("yarn-client")) { - return "client"; - } else if (master.equals("yarn-cluster")) { - return "cluster"; - } else if (master.startsWith("local")) { - return "client"; - } else { - String deployMode = getJavaProperties().getProperty("spark.submit.deployMode"); - if (deployMode == null) { - throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " + - "is not specified"); - } - if (!deployMode.equals("client") && !deployMode.equals("cluster")) { - throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode); - } - return deployMode; - } - } - - private boolean isYarnMode() { - return getSparkMaster().startsWith("yarn"); - } - - private String toShellFormat(String value) { - if (value.contains("\'") && value.contains("\"")) { - throw new RuntimeException("Spark property value could not contain both \" and '"); - } else if (value.contains("\'")) { - return "\"" + value + "\""; - } else { - return "\'" + value + "\'"; + synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException { + if (launcher == null) { + createLauncher(); } + InterpreterLaunchContext launchContext = new + InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name); + RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext); + process.setRemoteInterpreterEventPoller( + new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener)); + return process; } private List<Interpreter> getOrCreateSession(String user, String noteId) { @@ -815,8 +694,7 @@ public class InterpreterSetting { return null; } - private ManagedInterpreterGroup createInterpreterGroup(String groupId) - throws InterpreterException { + private ManagedInterpreterGroup createInterpreterGroup(String groupId) { AngularObjectRegistry angularObjectRegistry; ManagedInterpreterGroup interpreterGroup = new ManagedInterpreterGroup(groupId, this); angularObjectRegistry = @@ -938,7 +816,8 @@ public class InterpreterSetting { ); newProperties.put(key, property); } else { - throw new RuntimeException("Can not convert this type of property: " + value.getClass()); + throw new RuntimeException("Can not convert this type of property: " + + value.getClass()); } } return newProperties; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 9dfce21..f34195d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -744,11 +744,12 @@ public class InterpreterSettingManager { } /** - * Change interpreter property and restart + * Change interpreter properties and restart */ public void setPropertyAndRestart(String id, InterpreterOption option, Map<String, InterpreterProperty> properties, - List<Dependency> dependencies) throws IOException { + List<Dependency> dependencies) + throws InterpreterException, IOException { synchronized (interpreterSettings) { InterpreterSetting intpSetting = interpreterSettings.get(id); if (intpSetting != null) { @@ -761,7 +762,7 @@ public class InterpreterSettingManager { saveToFile(); } catch (Exception e) { loadFromFile(); - throw e; + throw new IOException(e); } } else { throw new InterpreterException("Interpreter setting id " + id + " not found"); @@ -770,7 +771,7 @@ public class InterpreterSettingManager { } // restart in note page - public void restart(String settingId, String noteId, String user) { + public void restart(String settingId, String noteId, String user) throws InterpreterException { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); synchronized (interpreterSettings) { @@ -794,7 +795,7 @@ public class InterpreterSettingManager { } } - public void restart(String id) { + public void restart(String id) throws InterpreterException { restart(id, "", "anonymous"); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index 1d7d916..ff9cb1c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -25,6 +25,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.List; @@ -52,7 +53,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup { return interpreterSetting; } - public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() { + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException { if (remoteInterpreterProcess == null) { LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId()); remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(); @@ -112,7 +113,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup { LOGGER.info("Job " + job.getJobName() + " aborted "); } - interpreter.close(); + try { + interpreter.close(); + } catch (InterpreterException e) { + LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e); + } //TODO(zjffdu) move the close of schedule to Interpreter if (null != scheduler) { SchedulerFactory.singleton().removeScheduler(scheduler.getName()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java new file mode 100644 index 0000000..f419967 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.InterpreterOption; +import org.apache.zeppelin.interpreter.InterpreterRunner; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Interpreter Launcher which use shell script to launch the interpreter process. + * + */ +public class ShellScriptLauncher extends InterpreterLauncher { + + private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class); + + public ShellScriptLauncher(ZeppelinConfiguration zConf) { + super(zConf); + } + + @Override + public InterpreterClient launch(InterpreterLaunchContext context) { + LOGGER.info("Launching Interpreter: " + context.getInterpreterGroupName()); + this.properties = context.getProperties(); + InterpreterOption option = context.getOption(); + InterpreterRunner runner = context.getRunner(); + String groupName = context.getInterpreterGroupName(); + + int connectTimeout = + zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + if (option.isExistingProcess()) { + return new RemoteInterpreterRunningProcess( + connectTimeout, + option.getHost(), + option.getPort()); + } else { + // create new remote process + String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + + context.getInterpreterGroupId(); + return new RemoteInterpreterManagedProcess( + runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), + zConf.getCallbackPortRange(), + zConf.getInterpreterDir() + "/" + groupName, localRepoPath, + buildEnvFromProperties(), connectTimeout, groupName); + } + } + + protected Map<String, String> buildEnvFromProperties() { + Map<String, String> env = new HashMap<>(); + for (Object key : properties.keySet()) { + if (RemoteInterpreterUtils.isEnvString((String) key)) { + env.put((String) key, properties.getProperty((String) key)); + } + } + return env; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java new file mode 100644 index 0000000..32a0530 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.launcher; + +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Spark specific launcher. + */ +public class SparkInterpreterLauncher extends ShellScriptLauncher { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class); + + public SparkInterpreterLauncher(ZeppelinConfiguration zConf) { + super(zConf); + } + + @Override + protected Map<String, String> buildEnvFromProperties() { + Map<String, String> env = new HashMap<String, String>(); + Properties sparkProperties = new Properties(); + String sparkMaster = getSparkMaster(properties); + for (String key : properties.stringPropertyNames()) { + if (RemoteInterpreterUtils.isEnvString(key)) { + env.put(key, properties.getProperty(key)); + } + if (isSparkConf(key, properties.getProperty(key))) { + sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key))); + } + } + + setupPropertiesForPySpark(sparkProperties); + setupPropertiesForSparkR(sparkProperties); + if (isYarnMode() && getDeployMode().equals("cluster")) { + env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true"); + } + + StringBuilder sparkConfBuilder = new StringBuilder(); + if (sparkMaster != null) { + sparkConfBuilder.append(" --master " + sparkMaster); + } + if (isYarnMode() && getDeployMode().equals("cluster")) { + sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + } + for (String name : sparkProperties.stringPropertyNames()) { + sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name)); + } + + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); + + // set these env in the order of + // 1. interpreter-setting + // 2. zeppelin-env.sh + // It is encouraged to set env in interpreter setting, but just for backward compatability, + // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting. + for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) { + String envValue = getEnv(envName); + if (envValue != null) { + env.put(envName, envValue); + } + } + LOGGER.debug("buildEnvFromProperties: " + env); + return env; + + } + + + /** + * get environmental variable in the following order + * + * 1. interpreter setting + * 2. zeppelin-env.sh + * + */ + private String getEnv(String envName) { + String env = properties.getProperty(envName); + if (env == null) { + env = System.getenv(envName); + } + return env; + } + + private boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + + private void setupPropertiesForPySpark(Properties sparkProperties) { + if (isYarnMode()) { + sparkProperties.setProperty("spark.yarn.isPython", "true"); + } + } + + private void mergeSparkProperty(Properties sparkProperties, String propertyName, + String propertyValue) { + if (sparkProperties.containsKey(propertyName)) { + String oldPropertyValue = sparkProperties.getProperty(propertyName); + sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue); + } else { + sparkProperties.setProperty(propertyName, propertyValue); + } + } + + private void setupPropertiesForSparkR(Properties sparkProperties) { + String sparkHome = getEnv("SPARK_HOME"); + File sparkRBasePath = null; + if (sparkHome == null) { + if (!getSparkMaster(properties).startsWith("local")) { + throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" + + " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + + " interpreter setting"); + } + String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); + sparkRBasePath = new File(zeppelinHome, + "interpreter" + File.separator + "spark" + File.separator + "R"); + } else { + sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib"); + } + + File sparkRPath = new File(sparkRBasePath, "sparkr.zip"); + if (sparkRPath.exists() && sparkRPath.isFile()) { + mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath()); + } else { + LOGGER.warn("sparkr.zip is not found, SparkR may not work."); + } + } + + /** + * Order to look for spark master + * 1. master in interpreter setting + * 2. spark.master interpreter setting + * 3. use local[*] + * @param properties + * @return + */ + private String getSparkMaster(Properties properties) { + String master = properties.getProperty("master"); + if (master == null) { + master = properties.getProperty("spark.master"); + if (master == null) { + master = "local[*]"; + } + } + return master; + } + + private String getDeployMode() { + String master = getSparkMaster(properties); + if (master.equals("yarn-client")) { + return "client"; + } else if (master.equals("yarn-cluster")) { + return "cluster"; + } else if (master.startsWith("local")) { + return "client"; + } else { + String deployMode = properties.getProperty("spark.submit.deployMode"); + if (deployMode == null) { + throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " + + "is not specified"); + } + if (!deployMode.equals("client") && !deployMode.equals("cluster")) { + throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode); + } + return deployMode; + } + } + + private boolean isYarnMode() { + return getSparkMaster(properties).startsWith("yarn"); + } + + private String toShellFormat(String value) { + if (value.contains("\'") && value.contains("\"")) { + throw new RuntimeException("Spark property value could not contain both \" and '"); + } else if (value.contains("\'")) { + return "\"" + value + "\""; + } else { + return "\'" + value + "\'"; + } + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java index 064abd5..7653824 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java @@ -82,7 +82,7 @@ public class InterpreterContextRunnerPool { } } - throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId); + throw new RuntimeException("Can not run paragraph " + paragraphId + " on " + noteId); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 54bf9e1..b479799 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -28,6 +28,7 @@ import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -42,6 +43,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -90,7 +92,7 @@ public class RemoteInterpreter extends Interpreter { return this.sessionId; } - public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() { + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException { if (this.interpreterProcess != null) { return this.interpreterProcess; } @@ -113,7 +115,7 @@ public class RemoteInterpreter extends Interpreter { } @Override - public void open() { + public void open() throws InterpreterException { synchronized (this) { if (!isOpened) { // create all the interpreters of the same session first, then Open the internal interpreter @@ -123,7 +125,11 @@ public class RemoteInterpreter extends Interpreter { // also see method Interpreter.getInterpreterInTheSameSessionByClassName for (Interpreter interpreter : getInterpreterGroup() .getOrCreateSession(userName, sessionId)) { - ((RemoteInterpreter) interpreter).internal_create(); + try { + ((RemoteInterpreter) interpreter).internal_create(); + } catch (IOException e) { + throw new InterpreterException(e); + } } interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { @@ -147,7 +153,7 @@ public class RemoteInterpreter extends Interpreter { } } - private void internal_create() { + private void internal_create() throws IOException { synchronized (this) { if (!isCreated) { RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); @@ -156,7 +162,7 @@ public class RemoteInterpreter extends Interpreter { public Void call(Client client) throws Exception { LOGGER.info("Create RemoteInterpreter {}", getClassName()); client.createInterpreter(getInterpreterGroup().getId(), sessionId, - className, (Map) property, userName); + className, (Map) getProperties(), userName); return null; } }); @@ -167,9 +173,14 @@ public class RemoteInterpreter extends Interpreter { @Override - public void close() { + public void close() throws InterpreterException { if (isOpened) { - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new InterpreterException(e); + } interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { @Override public Void call(Client client) throws Exception { @@ -184,13 +195,19 @@ public class RemoteInterpreter extends Interpreter { } @Override - public InterpreterResult interpret(final String st, final InterpreterContext context) { + public InterpreterResult interpret(final String st, final InterpreterContext context) + throws InterpreterException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("st:\n{}", st); } final FormType form = getFormType(); - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new InterpreterException(e); + } InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess .getInterpreterContextRunnerPool(); List<InterpreterContextRunner> runners = context.getRunners(); @@ -238,12 +255,17 @@ public class RemoteInterpreter extends Interpreter { } @Override - public void cancel(final InterpreterContext context) { + public void cancel(final InterpreterContext context) throws InterpreterException { if (!isOpened) { LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className); return; } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new InterpreterException(e); + } interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { @Override public Void call(Client client) throws Exception { @@ -254,7 +276,7 @@ public class RemoteInterpreter extends Interpreter { } @Override - public FormType getFormType() { + public FormType getFormType() throws InterpreterException { if (formType != null) { return formType; } @@ -265,7 +287,12 @@ public class RemoteInterpreter extends Interpreter { open(); } } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new InterpreterException(e); + } FormType type = interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<FormType>() { @Override @@ -277,13 +304,19 @@ public class RemoteInterpreter extends Interpreter { return type; } + @Override - public int getProgress(final InterpreterContext context) { + public int getProgress(final InterpreterContext context) throws InterpreterException { if (!isOpened) { LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className); return 0; } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new InterpreterException(e); + } return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<Integer>() { @Override @@ -296,12 +329,18 @@ public class RemoteInterpreter extends Interpreter { @Override public List<InterpreterCompletion> completion(final String buf, final int cursor, - final InterpreterContext interpreterContext) { + final InterpreterContext interpreterContext) + throws InterpreterException { if (!isOpened) { LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className); return new ArrayList<>(); } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new InterpreterException(e); + } return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { @Override @@ -317,7 +356,12 @@ public class RemoteInterpreter extends Interpreter { LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className); return Job.Status.UNKNOWN.name(); } - RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + RemoteInterpreterProcess interpreterProcess = null; + try { + interpreterProcess = getOrCreateInterpreterProcess(); + } catch (IOException e) { + throw new RuntimeException(e); + } return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<String>() { @Override @@ -331,7 +375,7 @@ public class RemoteInterpreter extends Interpreter { @Override public Scheduler getScheduler() { int maxConcurrency = Integer.parseInt( - property.getProperty("zeppelin.interpreter.max.poolsize", + getProperty("zeppelin.interpreter.max.poolsize", ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); Scheduler s = new RemoteScheduler( http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index d21a962..6e26e58 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; @@ -73,11 +74,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess String localRepoDir, Map<String, String> env, int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, String interpreterGroupName) { - super(new RemoteInterpreterEventPoller(listener, appListener), - connectTimeout); + super(connectTimeout); this.interpreterRunner = intpRunner; this.portRange = portRange; this.env = env; @@ -86,23 +84,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess this.interpreterGroupName = interpreterGroupName; } - RemoteInterpreterManagedProcess(String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - RemoteInterpreterEventPoller remoteInterpreterEventPoller, - int connectTimeout, - String interpreterGroupName) { - super(remoteInterpreterEventPoller, - connectTimeout); - this.interpreterRunner = intpRunner; - this.portRange = ":"; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - @Override public String getHost() { return "localhost"; @@ -124,7 +105,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess callbackHost = RemoteInterpreterUtils.findAvailableHostAddress(); callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); } catch (IOException e1) { - throw new InterpreterException(e1); + throw new RuntimeException(e1); } logger.info("Thrift server for callback will start. Port: {}", callbackPort); @@ -206,7 +187,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess executor.execute(cmdLine, procEnv, this); } catch (IOException e) { running.set(false); - throw new InterpreterException(e); + throw new RuntimeException(e); } try { @@ -217,7 +198,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } if (!running.get()) { callbackServer.stop(); - throw new InterpreterException("Cannot run interpreter"); + throw new RuntimeException(new String(cmdOut.toByteArray())); } } catch (InterruptedException e) { logger.error("Remote interpreter is not accessible"); @@ -227,7 +208,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public void stop() { // shutdown EventPoller first. - this.remoteInterpreterEventPoller.shutdown(); + this.getRemoteInterpreterEventPoller().shutdown(); if (callbackServer.isServing()) { callbackServer.stop(); } @@ -266,6 +247,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess running.set(false); } + @VisibleForTesting + public Map<String, String> getEnv() { + return env; + } + + @VisibleForTesting + public String getLocalRepoDir() { + return localRepoDir; + } + + @VisibleForTesting + public String getInterpreterDir() { + return interpreterDir; + } + + @VisibleForTesting + public String getInterpreterGroupName() { + return interpreterGroupName; + } + + @VisibleForTesting + public String getInterpreterRunner() { + return interpreterRunner; + } + public boolean isRunning() { return running.get(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index e45f15b..88cc489 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -21,6 +21,7 @@ import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,27 +29,17 @@ import org.slf4j.LoggerFactory; /** * Abstract class for interpreter process */ -public abstract class RemoteInterpreterProcess { +public abstract class RemoteInterpreterProcess implements InterpreterClient { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private GenericObjectPool<Client> clientPool; - protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller; + private RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; private int connectTimeout; public RemoteInterpreterProcess( - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener) { - this(new RemoteInterpreterEventPoller(listener, appListener), - connectTimeout); - this.remoteInterpreterEventPoller.setInterpreterProcess(this); - } - - RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller, - int connectTimeout) { + int connectTimeout) { this.interpreterContextRunnerPool = new InterpreterContextRunnerPool(); - this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; this.connectTimeout = connectTimeout; } @@ -56,6 +47,10 @@ public abstract class RemoteInterpreterProcess { return remoteInterpreterEventPoller; } + public void setRemoteInterpreterEventPoller(RemoteInterpreterEventPoller eventPoller) { + this.remoteInterpreterEventPoller = eventPoller; + } + public abstract String getHost(); public abstract int getPort(); public abstract void start(String userName, Boolean isUserImpersonate); @@ -147,9 +142,9 @@ public abstract class RemoteInterpreterProcess { } } catch (TException e) { broken = true; - throw new InterpreterException(e); + throw new RuntimeException(e); } catch (Exception e1) { - throw new InterpreterException(e1); + throw new RuntimeException(e1); } finally { if (client != null) { releaseClient(client, broken); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java index bb176be..d8715a0 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -30,12 +30,10 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { public RemoteInterpreterRunningProcess( int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, String host, int port ) { - super(connectTimeout, listener, appListener); + super(connectTimeout); this.host = host; this.port = port; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 03c5046..b5dda67 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -627,7 +627,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { if (intp == null) { String intpExceptionMsg = p.getJobName() + "'s Interpreter " + requiredReplName + " not found"; - InterpreterException intpException = new InterpreterException(intpExceptionMsg); + RuntimeException intpException = new RuntimeException(intpExceptionMsg); InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.ERROR, intpException.getMessage()); p.setReturn(intpResult, intpException); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 4652fcd..77fd04c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -892,7 +892,11 @@ public class Notebook implements NoteEventListener { if (releaseResource) { for (InterpreterSetting setting : notebook.getInterpreterSettingManager() .getInterpreterSettings(note.getId())) { - notebook.getInterpreterSettingManager().restart(setting.getId()); + try { + notebook.getInterpreterSettingManager().restart(setting.getId()); + } catch (InterpreterException e) { + logger.error("Fail to restart interpreter: " + setting.getId(), e); + } } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 68ce794..701943a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -312,15 +312,14 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { String replName = getRequiredReplName(trimmedBuffer); String body = getScriptBody(trimmedBuffer); - Interpreter repl = getRepl(replName); - if (repl == null) { - return null; - } - InterpreterContext interpreterContext = getInterpreterContextWithoutRunner(null); - List completion = repl.completion(body, cursor, interpreterContext); - return completion; + try { + Interpreter repl = getRepl(replName); + return repl.completion(body, cursor, interpreterContext); + } catch (InterpreterException e) { + throw new RuntimeException("Fail to get completion", e); + } } public int calculateCursorPosition(String buffer, String trimmedBuffer, int cursor) { @@ -362,11 +361,15 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { @Override public int progress() { String replName = getRequiredReplName(); - Interpreter repl = getRepl(replName); - if (repl != null) { + + try { + Interpreter repl = getRepl(replName); + if (repl == null) { + return 0; + } return repl.getProgress(getInterpreterContext(null)); - } else { - return 0; + } catch (InterpreterException e) { + throw new RuntimeException("Fail to get progress", e); } } @@ -494,10 +497,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { protected boolean jobAbort() { Interpreter repl = getRepl(getRequiredReplName()); if (repl == null) { - // when interpreters are already destroyed return true; } - Scheduler scheduler = repl.getScheduler(); if (scheduler == null) { return true; @@ -507,7 +508,11 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (job != null) { job.setStatus(Status.ABORT); } else { - repl.cancel(getInterpreterContextWithoutRunner(null)); + try { + repl.cancel(getInterpreterContextWithoutRunner(null)); + } catch (InterpreterException e) { + throw new RuntimeException(e); + } } return true; } @@ -738,12 +743,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } private boolean isValidInterpreter(String replName) { - try { - return factory.getInterpreter(user, note.getId(), replName) != null; - } catch (InterpreterException e) { - // ignore this exception, it would be recaught when running paragraph. - return false; - } + return factory.getInterpreter(user, note.getId(), replName) != null; } public void updateRuntimeInfos(String label, String tooltip, Map<String, String> infos, http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index 8f3e615..f23d433 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.helium; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.AbstractInterpreterTest; import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.ApplicationState; @@ -241,7 +242,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem @Test - public void testUnloadOnInterpreterRestart() throws IOException { + public void testUnloadOnInterpreterRestart() throws IOException, InterpreterException { // given HeliumPackage pkg1 = new HeliumPackage(HeliumType.APPLICATION, "name1",