http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java deleted file mode 100644 index 5d45b06..0000000 --- a/zeppelin-zengine/src/main/java/com/nflabs/zeppelin/util/Util.java +++ /dev/null @@ -1,170 +0,0 @@ -package com.nflabs.zeppelin.util; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -/** - * TODO(moon) : add description. - * - * @author Leemoonsoo - * - */ -public class Util { - - public static String[] split(String str, char split) { - return split(str, new String[] {String.valueOf(split)}, false); - } - - public static String[] split(String str, String[] splitters, boolean includeSplitter) { - String escapeSeq = "\"',;<%>"; - char escapeChar = '\\'; - String[] blockStart = new String[] {"\"", "'", "<%", "N_<"}; - String[] blockEnd = new String[] {"\"", "'", "%>", "N_>"}; - - return split(str, escapeSeq, escapeChar, blockStart, blockEnd, splitters, includeSplitter); - - } - - public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart, - String[] blockEnd, String[] splitters, boolean includeSplitter) { - - List<String> splits = new ArrayList<String>(); - - String curString = ""; - - boolean escape = false; // true when escape char is found - int lastEscapeOffset = -1; - int blockStartPos = -1; - List<Integer> blockStack = new LinkedList<Integer>(); - - for (int i = 0; i < str.length(); i++) { - char c = str.charAt(i); - - // escape char detected - if (c == escapeChar && escape == false) { - escape = true; - continue; - } - - // escaped char comes - if (escape == true) { - if (escapeSeq.indexOf(c) < 0) { - curString += escapeChar; - } - curString += c; - escape = false; - lastEscapeOffset = curString.length(); - continue; - } - - if (blockStack.size() > 0) { // inside of block - curString += c; - // check multichar block - boolean multicharBlockDetected = false; - for (int b = 0; b < blockStart.length; b++) { - if (blockStartPos >= 0 - && getBlockStr(blockStart[b]).compareTo(str.substring(blockStartPos, i)) == 0) { - blockStack.remove(0); - blockStack.add(0, b); - multicharBlockDetected = true; - break; - } - } - if (multicharBlockDetected == true) { - continue; - } - - // check if current block is nestable - if (isNestedBlock(blockStart[blockStack.get(0)]) == true) { - // try to find nested block start - - if (curString.substring(lastEscapeOffset + 1).endsWith( - getBlockStr(blockStart[blockStack.get(0)])) == true) { - blockStack.add(0, blockStack.get(0)); // block is started - blockStartPos = i; - continue; - } - } - - // check if block is finishing - if (curString.substring(lastEscapeOffset + 1).endsWith( - getBlockStr(blockEnd[blockStack.get(0)]))) { - // the block closer is one of the splitters (and not nested block) - if (isNestedBlock(blockEnd[blockStack.get(0)]) == false) { - for (String splitter : splitters) { - if (splitter.compareTo(getBlockStr(blockEnd[blockStack.get(0)])) == 0) { - splits.add(curString); - if (includeSplitter == true) { - splits.add(splitter); - } - curString = ""; - lastEscapeOffset = -1; - - break; - } - } - } - blockStartPos = -1; - blockStack.remove(0); - continue; - } - - } else { // not in the block - boolean splitted = false; - for (String splitter : splitters) { - // forward check for splitter - if (splitter.compareTo( - str.substring(i, Math.min(i + splitter.length(), str.length()))) == 0) { - splits.add(curString); - if (includeSplitter == true) { - splits.add(splitter); - } - curString = ""; - lastEscapeOffset = -1; - i += splitter.length() - 1; - splitted = true; - break; - } - } - if (splitted == true) { - continue; - } - - // add char to current string - curString += c; - - // check if block is started - for (int b = 0; b < blockStart.length; b++) { - if (curString.substring(lastEscapeOffset + 1) - .endsWith(getBlockStr(blockStart[b])) == true) { - blockStack.add(0, b); // block is started - blockStartPos = i; - break; - } - } - } - } - if (curString.length() > 0) { - splits.add(curString.trim()); - } - return splits.toArray(new String[] {}); - - } - - private static String getBlockStr(String blockDef) { - if (blockDef.startsWith("N_")) { - return blockDef.substring("N_".length()); - } else { - return blockDef; - } - } - - private static boolean isNestedBlock(String blockDef) { - if (blockDef.startsWith("N_")) { - return true; - } else { - return false; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java new file mode 100644 index 0000000..8495bb5 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.conf; + +import java.net.URL; +import java.util.List; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.configuration.tree.ConfigurationNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Zeppelin configuration. + * + * @author Leemoonsoo + * + */ +public class ZeppelinConfiguration extends XMLConfiguration { + private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml"; + private static final long serialVersionUID = 4749305895693848035L; + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class); + private static ZeppelinConfiguration conf; + + public ZeppelinConfiguration(URL url) throws ConfigurationException { + setDelimiterParsingDisabled(true); + load(url); + } + + public ZeppelinConfiguration() { + ConfVars[] vars = ConfVars.values(); + for (ConfVars v : vars) { + if (v.getType() == ConfVars.VarType.BOOLEAN) { + this.setProperty(v.getVarName(), v.getBooleanValue()); + } else if (v.getType() == ConfVars.VarType.LONG) { + this.setProperty(v.getVarName(), v.getLongValue()); + } else if (v.getType() == ConfVars.VarType.INT) { + this.setProperty(v.getVarName(), v.getIntValue()); + } else if (v.getType() == ConfVars.VarType.FLOAT) { + this.setProperty(v.getVarName(), v.getFloatValue()); + } else if (v.getType() == ConfVars.VarType.STRING) { + this.setProperty(v.getVarName(), v.getStringValue()); + } else { + throw new RuntimeException("Unsupported VarType"); + } + } + + } + + + /** + * Load from resource. + * + * @throws ConfigurationException + */ + public static ZeppelinConfiguration create() { + if (conf != null) { + return conf; + } + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + URL url; + + url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML); + if (url == null) { + ClassLoader cl = ZeppelinConfiguration.class.getClassLoader(); + if (cl != null) { + url = cl.getResource(ZEPPELIN_SITE_XML); + } + } + if (url == null) { + url = classLoader.getResource(ZEPPELIN_SITE_XML); + } + + if (url == null) { + LOG.warn("Failed to load configuration, proceeding with a default"); + conf = new ZeppelinConfiguration(); + } else { + try { + LOG.info("Load configuration from " + url); + conf = new ZeppelinConfiguration(url); + } catch (ConfigurationException e) { + LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e); + conf = new ZeppelinConfiguration(); + } + } + + return conf; + } + + + private String getStringValue(String name, String d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.size() == 0) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && p.getChildren("name").size() > 0 + && name.equals(p.getChildren("name").get(0).getValue())) { + return (String) p.getChildren("value").get(0).getValue(); + } + } + return d; + } + + private int getIntValue(String name, int d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.size() == 0) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && p.getChildren("name").size() > 0 + && name.equals(p.getChildren("name").get(0).getValue())) { + return Integer.parseInt((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + private long getLongValue(String name, long d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.size() == 0) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && p.getChildren("name").size() > 0 + && name.equals(p.getChildren("name").get(0).getValue())) { + return Long.parseLong((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + private float getFloatValue(String name, float d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.size() == 0) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && p.getChildren("name").size() > 0 + && name.equals(p.getChildren("name").get(0).getValue())) { + return Float.parseFloat((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + private boolean getBooleanValue(String name, boolean d) { + List<ConfigurationNode> properties = getRootNode().getChildren(); + if (properties == null || properties.size() == 0) { + return d; + } + for (ConfigurationNode p : properties) { + if (p.getChildren("name") != null && p.getChildren("name").size() > 0 + && name.equals(p.getChildren("name").get(0).getValue())) { + return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue()); + } + } + return d; + } + + public String getString(ConfVars c) { + return getString(c.name(), c.getVarName(), c.getStringValue()); + } + + public String getString(String envName, String propertyName, String defaultValue) { + if (System.getenv(envName) != null) { + return System.getenv(envName); + } + if (System.getProperty(propertyName) != null) { + return System.getProperty(propertyName); + } + + return getStringValue(propertyName, defaultValue); + } + + public int getInt(ConfVars c) { + return getInt(c.name(), c.getVarName(), c.getIntValue()); + } + + public int getInt(String envName, String propertyName, int defaultValue) { + if (System.getenv(envName) != null) { + return Integer.parseInt(System.getenv(envName)); + } + + if (System.getProperty(propertyName) != null) { + return Integer.parseInt(System.getProperty(propertyName)); + } + return getIntValue(propertyName, defaultValue); + } + + public long getLong(ConfVars c) { + return getLong(c.name(), c.getVarName(), c.getLongValue()); + } + + public long getLong(String envName, String propertyName, long defaultValue) { + if (System.getenv(envName) != null) { + return Long.parseLong(System.getenv(envName)); + } + + if (System.getProperty(propertyName) != null) { + return Long.parseLong(System.getProperty(propertyName)); + } + return getLongValue(propertyName, defaultValue); + } + + public float getFloat(ConfVars c) { + return getFloat(c.name(), c.getVarName(), c.getFloatValue()); + } + + public float getFloat(String envName, String propertyName, float defaultValue) { + if (System.getenv(envName) != null) { + return Float.parseFloat(System.getenv(envName)); + } + if (System.getProperty(propertyName) != null) { + return Float.parseFloat(System.getProperty(propertyName)); + } + return getFloatValue(propertyName, defaultValue); + } + + public boolean getBoolean(ConfVars c) { + return getBoolean(c.name(), c.getVarName(), c.getBooleanValue()); + } + + public boolean getBoolean(String envName, String propertyName, boolean defaultValue) { + if (System.getenv(envName) != null) { + return Boolean.parseBoolean(System.getenv(envName)); + } + + if (System.getProperty(propertyName) != null) { + return Boolean.parseBoolean(System.getProperty(propertyName)); + } + return getBooleanValue(propertyName, defaultValue); + } + + public boolean useSsl() { + return getBoolean(ConfVars.ZEPPELIN_SSL); + } + + public boolean useClientAuth() { + return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH); + } + + public int getServerPort() { + return getInt(ConfVars.ZEPPELIN_PORT); + } + + public int getWebSocketPort() { + int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT); + if (port < 0) { + return getServerPort() + 1; + } else { + return port; + } + } + + public String getKeyStorePath() { + return getRelativeDir(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH); + } + + public String getKeyStoreType() { + return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE); + } + + public String getKeyStorePassword() { + return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD); + } + + public String getKeyManagerPassword() { + String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD); + if (password == null) { + return getKeyStorePassword(); + } else { + return password; + } + } + + public String getTrustStorePath() { + String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH); + if (path == null) { + return getKeyStorePath(); + } else { + return getRelativeDir(path); + } + } + + public String getTrustStoreType() { + String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE); + if (type == null) { + return getKeyStoreType(); + } else { + return type; + } + } + + public String getTrustStorePassword() { + String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD); + if (password == null) { + return getKeyStorePassword(); + } else { + return password; + } + } + + public String getNotebookDir() { + return getRelativeDir(ConfVars.ZEPPELIN_NOTEBOOK_DIR); + } + + public String getInterpreterDir() { + return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR); + } + + public String getInterpreterSettingPath() { + return getRelativeDir("conf/interpreter.json"); + } + + public String getInterpreterRemoteRunnerPath() { + return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER); + } + + public String getRelativeDir(ConfVars c) { + return getRelativeDir(getString(c)); + } + + public String getRelativeDir(String path) { + if (path != null && path.startsWith("/")) { + return path; + } else { + return getString(ConfVars.ZEPPELIN_HOME) + "/" + path; + } + } + + + /** + * Wrapper class. + * + * @author Leemoonsoo + * + */ + public static enum ConfVars { + ZEPPELIN_HOME("zeppelin.home", "../"), + ZEPPELIN_PORT("zeppelin.server.port", 8080), + // negative websocket port denotes that server port + 1 should be used + ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1), + ZEPPELIN_SSL("zeppelin.ssl", false), + ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false), + ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "conf/keystore"), + ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"), + ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""), + ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null), + ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null), + ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null), + ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null), + ZEPPELIN_WAR("zeppelin.war", "../zeppelin-web/src/main/webapp"), + ZEPPELIN_API_WAR("zeppelin.api.war", "../zeppelin-docs/src/main/swagger"), + ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter," + + "org.apache.zeppelin.spark.PySparkInterpreter," + + "org.apache.zeppelin.spark.SparkSqlInterpreter," + + "org.apache.zeppelin.spark.DepInterpreter," + + "org.apache.zeppelin.markdown.Markdown," + + "org.apache.zeppelin.shell.ShellInterpreter"), + ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), + ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), + ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), + ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"), + // Decide when new note is created, interpreter settings will be binded automatically or not. + ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true); + + private String varName; + @SuppressWarnings("rawtypes") + private Class varClass; + private String stringValue; + private VarType type; + private int intValue; + private float floatValue; + private boolean booleanValue; + private long longValue; + + + ConfVars(String varName, String varValue) { + this.varName = varName; + this.varClass = String.class; + this.stringValue = varValue; + this.intValue = -1; + this.floatValue = -1; + this.longValue = -1; + this.booleanValue = false; + this.type = VarType.STRING; + } + + ConfVars(String varName, int intValue) { + this.varName = varName; + this.varClass = Integer.class; + this.stringValue = null; + this.intValue = intValue; + this.floatValue = -1; + this.longValue = -1; + this.booleanValue = false; + this.type = VarType.INT; + } + + ConfVars(String varName, long longValue) { + this.varName = varName; + this.varClass = Integer.class; + this.stringValue = null; + this.intValue = -1; + this.floatValue = -1; + this.longValue = longValue; + this.booleanValue = false; + this.type = VarType.INT; + } + + ConfVars(String varName, float floatValue) { + this.varName = varName; + this.varClass = Float.class; + this.stringValue = null; + this.intValue = -1; + this.longValue = -1; + this.floatValue = floatValue; + this.booleanValue = false; + this.type = VarType.FLOAT; + } + + ConfVars(String varName, boolean booleanValue) { + this.varName = varName; + this.varClass = Boolean.class; + this.stringValue = null; + this.intValue = -1; + this.longValue = -1; + this.floatValue = -1; + this.booleanValue = booleanValue; + this.type = VarType.BOOLEAN; + } + + public String getVarName() { + return varName; + } + + @SuppressWarnings("rawtypes") + public Class getVarClass() { + return varClass; + } + + public int getIntValue() { + return intValue; + } + + public long getLongValue() { + return longValue; + } + + public float getFloatValue() { + return floatValue; + } + + public String getStringValue() { + return stringValue; + } + + public boolean getBooleanValue() { + return booleanValue; + } + + public VarType getType() { + return type; + } + + enum VarType { + STRING { + @Override + void checkType(String value) throws Exception {} + }, + INT { + @Override + void checkType(String value) throws Exception { + Integer.valueOf(value); + } + }, + LONG { + @Override + void checkType(String value) throws Exception { + Long.valueOf(value); + } + }, + FLOAT { + @Override + void checkType(String value) throws Exception { + Float.valueOf(value); + } + }, + BOOLEAN { + @Override + void checkType(String value) throws Exception { + Boolean.valueOf(value); + } + }; + + boolean isType(String value) { + try { + checkType(value); + } catch (Exception e) { + return false; + } + return true; + } + + String typeString() { + return name().toUpperCase(); + } + + abstract void checkType(String value) throws Exception; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java new file mode 100644 index 0000000..7c81e90 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * Manage interpreters. + * + */ +public class InterpreterFactory { + Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); + + private Map<String, URLClassLoader> cleanCl = Collections + .synchronizedMap(new HashMap<String, URLClassLoader>()); + + private ZeppelinConfiguration conf; + String[] interpreterClassList; + + private Map<String, InterpreterSetting> interpreterSettings = + new HashMap<String, InterpreterSetting>(); + + private Map<String, List<String>> interpreterBindings = new HashMap<String, List<String>>(); + + private Gson gson; + + private InterpreterOption defaultOption; + + public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException { + this(conf, new InterpreterOption(true)); + } + + + public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption) + throws InterpreterException, IOException { + this.conf = conf; + this.defaultOption = defaultOption; + String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); + interpreterClassList = replsConf.split(","); + + GsonBuilder builder = new GsonBuilder(); + builder.setPrettyPrinting(); + builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + gson = builder.create(); + + init(); + } + + private void init() throws InterpreterException, IOException { + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + + // Load classes + File[] interpreterDirs = new File(conf.getInterpreterDir()).listFiles(); + if (interpreterDirs != null) { + for (File path : interpreterDirs) { + logger.info("Reading " + path.getAbsolutePath()); + URL[] urls = null; + try { + urls = recursiveBuildLibList(path); + } catch (MalformedURLException e1) { + logger.error("Can't load jars ", e1); + } + URLClassLoader ccl = new URLClassLoader(urls, oldcl); + + for (String className : interpreterClassList) { + try { + Class.forName(className, true, ccl); + Set<String> keys = Interpreter.registeredInterpreters.keySet(); + for (String intName : keys) { + if (className.equals( + Interpreter.registeredInterpreters.get(intName).getClassName())) { + Interpreter.registeredInterpreters.get(intName).setPath(path.getAbsolutePath()); + logger.info("Interpreter " + intName + " found. class=" + className); + cleanCl.put(path.getAbsolutePath(), ccl); + } + } + } catch (ClassNotFoundException e) { + // nothing to do + } + } + } + } + + loadFromFile(); + + // if no interpreter settings are loaded, create default set + synchronized (interpreterSettings) { + if (interpreterSettings.size() == 0) { + HashMap<String, List<RegisteredInterpreter>> groupClassNameMap = + new HashMap<String, List<RegisteredInterpreter>>(); + + for (String k : Interpreter.registeredInterpreters.keySet()) { + RegisteredInterpreter info = Interpreter.registeredInterpreters.get(k); + + if (!groupClassNameMap.containsKey(info.getGroup())) { + groupClassNameMap.put(info.getGroup(), new LinkedList<RegisteredInterpreter>()); + } + + groupClassNameMap.get(info.getGroup()).add(info); + } + + for (String className : interpreterClassList) { + for (String groupName : groupClassNameMap.keySet()) { + List<RegisteredInterpreter> infos = groupClassNameMap.get(groupName); + + boolean found = false; + Properties p = new Properties(); + for (RegisteredInterpreter info : infos) { + if (found == false && info.getClassName().equals(className)) { + found = true; + } + + for (String k : info.getProperties().keySet()) { + p.put(k, info.getProperties().get(k).getDefaultValue()); + } + } + + if (found) { + // add all interpreters in group + add(groupName, groupName, defaultOption, p); + groupClassNameMap.remove(groupName); + break; + } + } + } + } + } + + for (String settingId : interpreterSettings.keySet()) { + InterpreterSetting setting = interpreterSettings.get(settingId); + logger.info("Interpreter setting group {} : id={}, name={}", + setting.getGroup(), settingId, setting.getName()); + for (Interpreter interpreter : setting.getInterpreterGroup()) { + logger.info(" className = {}", interpreter.getClassName()); + } + } + } + + private void loadFromFile() throws IOException { + GsonBuilder builder = new GsonBuilder(); + builder.setPrettyPrinting(); + builder.registerTypeAdapter(Interpreter.class, new InterpreterSerializer()); + Gson gson = builder.create(); + + File settingFile = new File(conf.getInterpreterSettingPath()); + if (!settingFile.exists()) { + // nothing to read + return; + } + FileInputStream fis = new FileInputStream(settingFile); + InputStreamReader isr = new InputStreamReader(fis); + BufferedReader bufferedReader = new BufferedReader(isr); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + sb.append(line); + } + isr.close(); + fis.close(); + + String json = sb.toString(); + InterpreterInfoSaving info = gson.fromJson(json, InterpreterInfoSaving.class); + + for (String k : info.interpreterSettings.keySet()) { + InterpreterSetting setting = info.interpreterSettings.get(k); + + // Always use separate interpreter process + // While we decided to turn this feature on always (without providing + // enable/disable option on GUI). + // previously created setting should turn this feature on here. + setting.getOption().setRemote(true); + + InterpreterGroup interpreterGroup = createInterpreterGroup( + setting.getGroup(), + setting.getOption(), + setting.getProperties()); + + InterpreterSetting intpSetting = new InterpreterSetting( + setting.id(), + setting.getName(), + setting.getGroup(), + setting.getOption(), + interpreterGroup); + + interpreterSettings.put(k, intpSetting); + } + + this.interpreterBindings = info.interpreterBindings; + } + + + private void saveToFile() throws IOException { + String jsonString; + + synchronized (interpreterSettings) { + InterpreterInfoSaving info = new InterpreterInfoSaving(); + info.interpreterBindings = interpreterBindings; + info.interpreterSettings = interpreterSettings; + + jsonString = gson.toJson(info); + } + + File settingFile = new File(conf.getInterpreterSettingPath()); + if (!settingFile.exists()) { + settingFile.createNewFile(); + } + + FileOutputStream fos = new FileOutputStream(settingFile, false); + OutputStreamWriter out = new OutputStreamWriter(fos); + out.append(jsonString); + out.close(); + fos.close(); + } + + private RegisteredInterpreter getRegisteredReplInfoFromClassName(String clsName) { + Set<String> keys = Interpreter.registeredInterpreters.keySet(); + for (String intName : keys) { + RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName); + if (clsName.equals(info.getClassName())) { + return info; + } + } + return null; + } + + /** + * Return ordered interpreter setting list. + * The list does not contain more than one setting from the same interpreter class. + * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name + * @return + */ + public List<String> getDefaultInterpreterSettingList() { + // this list will contain default interpreter setting list + List<String> defaultSettings = new LinkedList<String>(); + + // to ignore the same interpreter group + Map<String, Boolean> interpreterGroupCheck = new HashMap<String, Boolean>(); + + List<InterpreterSetting> sortedSettings = get(); + + for (InterpreterSetting setting : sortedSettings) { + if (defaultSettings.contains(setting.id())) { + continue; + } + + if (!interpreterGroupCheck.containsKey(setting.getGroup())) { + defaultSettings.add(setting.id()); + interpreterGroupCheck.put(setting.getGroup(), true); + } + } + return defaultSettings; + } + + public List<RegisteredInterpreter> getRegisteredInterpreterList() { + List<RegisteredInterpreter> registeredInterpreters = new LinkedList<RegisteredInterpreter>(); + + for (String className : interpreterClassList) { + registeredInterpreters.add(Interpreter.findRegisteredInterpreterByClassName(className)); + } + + return registeredInterpreters; + } + + /** + * @param name user defined name + * @param groupName interpreter group name to instantiate + * @param properties + * @return + * @throws InterpreterException + * @throws IOException + */ + public InterpreterGroup add(String name, String groupName, + InterpreterOption option, Properties properties) + throws InterpreterException, IOException { + synchronized (interpreterSettings) { + InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties); + + InterpreterSetting intpSetting = new InterpreterSetting( + name, + groupName, + option, + interpreterGroup); + interpreterSettings.put(intpSetting.id(), intpSetting); + + saveToFile(); + return interpreterGroup; + } + } + + private InterpreterGroup createInterpreterGroup(String groupName, + InterpreterOption option, + Properties properties) + throws InterpreterException { + InterpreterGroup interpreterGroup = new InterpreterGroup(); + + for (String className : interpreterClassList) { + Set<String> keys = Interpreter.registeredInterpreters.keySet(); + for (String intName : keys) { + RegisteredInterpreter info = Interpreter.registeredInterpreters + .get(intName); + if (info.getClassName().equals(className) + && info.getGroup().equals(groupName)) { + Interpreter intp; + + if (option.isRemote()) { + intp = createRemoteRepl(info.getPath(), + info.getClassName(), + properties); + } else { + intp = createRepl(info.getPath(), + info.getClassName(), + properties); + } + interpreterGroup.add(intp); + intp.setInterpreterGroup(interpreterGroup); + break; + } + } + } + return interpreterGroup; + } + + public void remove(String id) throws IOException { + synchronized (interpreterSettings) { + if (interpreterSettings.containsKey(id)) { + InterpreterSetting intp = interpreterSettings.get(id); + intp.getInterpreterGroup().close(); + intp.getInterpreterGroup().destroy(); + + interpreterSettings.remove(id); + for (List<String> settings : interpreterBindings.values()) { + Iterator<String> it = settings.iterator(); + while (it.hasNext()) { + String settingId = it.next(); + if (settingId.equals(id)) { + it.remove(); + } + } + } + saveToFile(); + } + } + } + + /** + * Get loaded interpreters + * @return + */ + public List<InterpreterSetting> get() { + synchronized (interpreterSettings) { + List<InterpreterSetting> orderedSettings = new LinkedList<InterpreterSetting>(); + List<InterpreterSetting> settings = new LinkedList<InterpreterSetting>( + interpreterSettings.values()); + Collections.sort(settings, new Comparator<InterpreterSetting>(){ + @Override + public int compare(InterpreterSetting o1, InterpreterSetting o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + + for (String className : interpreterClassList) { + for (InterpreterSetting setting : settings) { + for (InterpreterSetting orderedSetting : orderedSettings) { + if (orderedSetting.id().equals(setting.id())) { + continue; + } + } + + for (Interpreter intp : setting.getInterpreterGroup()) { + if (className.equals(intp.getClassName())) { + boolean alreadyAdded = false; + for (InterpreterSetting st : orderedSettings) { + if (setting.id().equals(st.id())) { + alreadyAdded = true; + } + } + if (alreadyAdded == false) { + orderedSettings.add(setting); + } + } + } + } + } + return orderedSettings; + } + } + + public InterpreterSetting get(String name) { + synchronized (interpreterSettings) { + return interpreterSettings.get(name); + } + } + + public void putNoteInterpreterSettingBinding(String noteId, + List<String> settingList) throws IOException { + synchronized (interpreterSettings) { + interpreterBindings.put(noteId, settingList); + saveToFile(); + } + } + + public void removeNoteInterpreterSettingBinding(String noteId) { + synchronized (interpreterSettings) { + interpreterBindings.remove(noteId); + } + } + + public List<String> getNoteInterpreterSettingBinding(String noteId) { + LinkedList<String> bindings = new LinkedList<String>(); + synchronized (interpreterSettings) { + List<String> settingIds = interpreterBindings.get(noteId); + if (settingIds != null) { + bindings.addAll(settingIds); + } + } + return bindings; + } + + /** + * Change interpreter property and restart + * @param name + * @param properties + * @throws IOException + */ + public void setPropertyAndRestart(String id, InterpreterOption option, + Properties properties) throws IOException { + synchronized (interpreterSettings) { + InterpreterSetting intpsetting = interpreterSettings.get(id); + if (intpsetting != null) { + intpsetting.getInterpreterGroup().close(); + intpsetting.getInterpreterGroup().destroy(); + + intpsetting.setOption(option); + + InterpreterGroup interpreterGroup = createInterpreterGroup( + intpsetting.getGroup(), option, properties); + intpsetting.setInterpreterGroup(interpreterGroup); + saveToFile(); + } else { + throw new InterpreterException("Interpreter setting id " + id + + " not found"); + } + } + } + + public void restart(String id) { + synchronized (interpreterSettings) { + synchronized (interpreterSettings) { + InterpreterSetting intpsetting = interpreterSettings.get(id); + if (intpsetting != null) { + intpsetting.getInterpreterGroup().close(); + intpsetting.getInterpreterGroup().destroy(); + + InterpreterGroup interpreterGroup = createInterpreterGroup( + intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties()); + intpsetting.setInterpreterGroup(interpreterGroup); + } else { + throw new InterpreterException("Interpreter setting id " + id + + " not found"); + } + } + } + } + + + public void close() { + synchronized (interpreterSettings) { + synchronized (interpreterSettings) { + Collection<InterpreterSetting> intpsettings = interpreterSettings.values(); + for (InterpreterSetting intpsetting : intpsettings) { + intpsetting.getInterpreterGroup().close(); + intpsetting.getInterpreterGroup().destroy(); + } + } + } + } + + private Interpreter createRepl(String dirName, String className, + Properties property) + throws InterpreterException { + logger.info("Create repl {} from {}", className, dirName); + + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + try { + + URLClassLoader ccl = cleanCl.get(dirName); + if (ccl == null) { + // classloader fallback + ccl = URLClassLoader.newInstance(new URL[] {}, oldcl); + } + + boolean separateCL = true; + try { // check if server's classloader has driver already. + Class cls = this.getClass().forName(className); + if (cls != null) { + separateCL = false; + } + } catch (Exception e) { + // nothing to do. + } + + URLClassLoader cl; + + if (separateCL == true) { + cl = URLClassLoader.newInstance(new URL[] {}, ccl); + } else { + cl = ccl; + } + Thread.currentThread().setContextClassLoader(cl); + + Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className); + Constructor<Interpreter> constructor = + replClass.getConstructor(new Class[] {Properties.class}); + Interpreter repl = constructor.newInstance(property); + repl.setClassloaderUrls(ccl.getURLs()); + LazyOpenInterpreter intp = new LazyOpenInterpreter( + new ClassloaderInterpreter(repl, cl)); + return intp; + } catch (SecurityException e) { + throw new InterpreterException(e); + } catch (NoSuchMethodException e) { + throw new InterpreterException(e); + } catch (IllegalArgumentException e) { + throw new InterpreterException(e); + } catch (InstantiationException e) { + throw new InterpreterException(e); + } catch (IllegalAccessException e) { + throw new InterpreterException(e); + } catch (InvocationTargetException e) { + throw new InterpreterException(e); + } catch (ClassNotFoundException e) { + throw new InterpreterException(e); + } finally { + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + + private Interpreter createRemoteRepl(String interpreterPath, String className, + Properties property) { + + LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( + property, className, conf.getInterpreterRemoteRunnerPath(), interpreterPath)); + return intp; + } + + + private URL[] recursiveBuildLibList(File path) throws MalformedURLException { + URL[] urls = new URL[0]; + if (path == null || path.exists() == false) { + return urls; + } else if (path.getName().startsWith(".")) { + return urls; + } else if (path.isDirectory()) { + File[] files = path.listFiles(); + if (files != null) { + for (File f : files) { + urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f)); + } + } + return urls; + } else { + return new URL[] {path.toURI().toURL()}; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java new file mode 100644 index 0000000..ae507d4 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSaving.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.List; +import java.util.Map; + +/** + * + */ +public class InterpreterInfoSaving { + public Map<String, InterpreterSetting> interpreterSettings; + public Map<String, List<String>> interpreterBindings; +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java new file mode 100644 index 0000000..e2adecd --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + * + */ +public class InterpreterOption { + boolean remote; + + public InterpreterOption() { + remote = false; + } + + public InterpreterOption(boolean remote) { + this.remote = remote; + } + + public boolean isRemote() { + return remote; + } + + public void setRemote(boolean remote) { + this.remote = remote; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java new file mode 100644 index 0000000..a2deb7e --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.lang.reflect.Type; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + + +/** + * Interpreter class serializer for gson + * + */ +public class InterpreterSerializer implements JsonSerializer<Interpreter>, + JsonDeserializer<Interpreter> { + + @Override + public JsonElement serialize(Interpreter interpreter, Type type, + JsonSerializationContext context) { + JsonObject json = new JsonObject(); + json.addProperty("class", interpreter.getClassName()); + json.addProperty( + "name", + Interpreter.findRegisteredInterpreterByClassName( + interpreter.getClassName()).getName()); + return json; + } + + @Override + public Interpreter deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java new file mode 100644 index 0000000..04785aa --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.Properties; +import java.util.Random; + +import org.apache.zeppelin.notebook.utility.IdHashes; + +/** + * Interpreter settings + */ +public class InterpreterSetting { + private String id; + private String name; + private String group; + private String description; + private Properties properties; + private InterpreterGroup interpreterGroup; + private InterpreterOption option; + + public InterpreterSetting(String id, String name, + String group, + InterpreterOption option, + InterpreterGroup interpreterGroup) { + this.id = id; + this.name = name; + this.group = group; + this.properties = interpreterGroup.getProperty(); + this.option = option; + this.interpreterGroup = interpreterGroup; + } + + public InterpreterSetting(String name, + String group, + InterpreterOption option, + InterpreterGroup interpreterGroup) { + this(generateId(), name, group, option, interpreterGroup); + } + + public String id() { + return id; + } + + private static String generateId() { + return IdHashes.encode(System.currentTimeMillis() + new Random().nextInt()); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getDescription() { + return description; + } + + public void setDescription(String desc) { + this.description = desc; + } + + public String getGroup() { + return group; + } + + public InterpreterGroup getInterpreterGroup() { + return interpreterGroup; + } + + public void setInterpreterGroup(InterpreterGroup interpreterGroup) { + this.interpreterGroup = interpreterGroup; + this.properties = interpreterGroup.getProperty(); + } + + public Properties getProperties() { + return properties; + } + + public InterpreterOption getOption() { + if (option == null) { + option = new InterpreterOption(); + } + + return option; + } + + public void setOption(InterpreterOption option) { + this.option = option; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java new file mode 100644 index 0000000..23cd957 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/JobListenerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.scheduler.JobListener; + +/** + * TODO(moon): provide description. + * + * @author Leemoonsoo + * + */ +public interface JobListenerFactory { + public JobListener getParagraphJobListener(Note note); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java new file mode 100644 index 0000000..9204a07 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.notebook.utility.IdHashes; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.JobListener; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.Job.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * Binded interpreters for a note + */ +public class Note implements Serializable, JobListener { + transient Logger logger = LoggerFactory.getLogger(Note.class); + List<Paragraph> paragraphs = new LinkedList<Paragraph>(); + private String name; + private String id; + + private transient NoteInterpreterLoader replLoader; + private transient ZeppelinConfiguration conf; + private transient JobListenerFactory jobListenerFactory; + + /** + * note configurations. + * + * - looknfeel - cron + */ + private Map<String, Object> config = new HashMap<String, Object>(); + + /** + * note information. + * + * - cron : cron expression validity. + */ + private Map<String, Object> info = new HashMap<String, Object>(); + + public Note() {} + + public Note(ZeppelinConfiguration conf, NoteInterpreterLoader replLoader, + JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) { + this.conf = conf; + this.replLoader = replLoader; + this.jobListenerFactory = jobListenerFactory; + generateId(); + } + + private void generateId() { + id = IdHashes.encode(System.currentTimeMillis() + new Random().nextInt()); + } + + public String id() { + return id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public NoteInterpreterLoader getNoteReplLoader() { + return replLoader; + } + + public void setReplLoader(NoteInterpreterLoader replLoader) { + this.replLoader = replLoader; + } + + public void setZeppelinConfiguration(ZeppelinConfiguration conf) { + this.conf = conf; + } + + /** + * Add paragraph last. + * + * @param p + */ + public Paragraph addParagraph() { + Paragraph p = new Paragraph(this, replLoader); + synchronized (paragraphs) { + paragraphs.add(p); + } + return p; + } + + /** + * Insert paragraph in given index. + * + * @param index + * @param p + */ + public Paragraph insertParagraph(int index) { + Paragraph p = new Paragraph(this, replLoader); + synchronized (paragraphs) { + paragraphs.add(index, p); + } + return p; + } + + /** + * Remove paragraph by id. + * + * @param paragraphId + * @return + */ + public Paragraph removeParagraph(String paragraphId) { + synchronized (paragraphs) { + for (int i = 0; i < paragraphs.size(); i++) { + Paragraph p = paragraphs.get(i); + if (p.getId().equals(paragraphId)) { + paragraphs.remove(i); + return p; + } + } + } + return null; + } + + /** + * Move paragraph into the new index (order from 0 ~ n-1). + * + * @param paragraphId + * @param index new index + */ + public void moveParagraph(String paragraphId, int index) { + synchronized (paragraphs) { + int oldIndex = -1; + Paragraph p = null; + + if (index < 0 || index >= paragraphs.size()) { + return; + } + + for (int i = 0; i < paragraphs.size(); i++) { + if (paragraphs.get(i).getId().equals(paragraphId)) { + oldIndex = i; + if (oldIndex == index) { + return; + } + p = paragraphs.remove(i); + } + } + + if (p == null) { + return; + } else { + if (oldIndex < index) { + paragraphs.add(index, p); + } else { + paragraphs.add(index, p); + } + } + } + } + + public boolean isLastParagraph(String paragraphId) { + if (!paragraphs.isEmpty()) { + synchronized (paragraphs) { + if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) { + return true; + } + } + return false; + } + /** because empty list, cannot remove nothing right? */ + return true; + } + + public Paragraph getParagraph(String paragraphId) { + synchronized (paragraphs) { + for (Paragraph p : paragraphs) { + if (p.getId().equals(paragraphId)) { + return p; + } + } + } + return null; + } + + public Paragraph getLastParagraph() { + synchronized (paragraphs) { + return paragraphs.get(paragraphs.size() - 1); + } + } + + /** + * Run all paragraphs sequentially. + * + * @param jobListener + */ + public void runAll() { + synchronized (paragraphs) { + for (Paragraph p : paragraphs) { + p.setNoteReplLoader(replLoader); + p.setListener(jobListenerFactory.getParagraphJobListener(this)); + Interpreter intp = replLoader.get(p.getRequiredReplName()); + intp.getScheduler().submit(p); + } + } + } + + /** + * Run a single paragraph. + * + * @param paragraphId + */ + public void run(String paragraphId) { + Paragraph p = getParagraph(paragraphId); + p.setNoteReplLoader(replLoader); + p.setListener(jobListenerFactory.getParagraphJobListener(this)); + Interpreter intp = replLoader.get(p.getRequiredReplName()); + if (intp == null) { + throw new InterpreterException("Interpreter " + p.getRequiredReplName() + " not found"); + } + intp.getScheduler().submit(p); + } + + public List<String> completion(String paragraphId, String buffer, int cursor) { + Paragraph p = getParagraph(paragraphId); + p.setNoteReplLoader(replLoader); + p.setListener(jobListenerFactory.getParagraphJobListener(this)); + return p.completion(buffer, cursor); + } + + public List<Paragraph> getParagraphs() { + synchronized (paragraphs) { + return new LinkedList<Paragraph>(paragraphs); + } + } + + public void persist() throws IOException { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + + File dir = new File(conf.getNotebookDir() + "/" + id); + if (!dir.exists()) { + dir.mkdirs(); + } else if (dir.isFile()) { + throw new RuntimeException("File already exists" + dir.toString()); + } + + File file = new File(conf.getNotebookDir() + "/" + id + "/note.json"); + logger().info("Persist note {} into {}", id, file.getAbsolutePath()); + + String json = gson.toJson(this); + FileOutputStream out = new FileOutputStream(file); + out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING))); + out.close(); + } + + public void unpersist() throws IOException { + File dir = new File(conf.getNotebookDir() + "/" + id); + + FileUtils.deleteDirectory(dir); + } + + public static Note load(String id, ZeppelinConfiguration conf, NoteInterpreterLoader replLoader, + Scheduler scheduler, JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) + throws IOException { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + + File file = new File(conf.getNotebookDir() + "/" + id + "/note.json"); + logger().info("Load note {} from {}", id, file.getAbsolutePath()); + + if (!file.isFile()) { + return null; + } + + FileInputStream ins = new FileInputStream(file); + String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + Note note = gson.fromJson(json, Note.class); + note.setZeppelinConfiguration(conf); + note.setReplLoader(replLoader); + note.jobListenerFactory = jobListenerFactory; + for (Paragraph p : note.paragraphs) { + if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { + p.setStatus(Status.ABORT); + } + } + + return note; + } + + public Map<String, Object> getConfig() { + if (config == null) { + config = new HashMap<String, Object>(); + } + return config; + } + + public void setConfig(Map<String, Object> config) { + this.config = config; + } + + public Map<String, Object> getInfo() { + if (info == null) { + info = new HashMap<String, Object>(); + } + return info; + } + + public void setInfo(Map<String, Object> info) { + this.info = info; + } + + @Override + public void beforeStatusChange(Job job, Status before, Status after) { + Paragraph p = (Paragraph) job; + } + + @Override + public void afterStatusChange(Job job, Status before, Status after) { + Paragraph p = (Paragraph) job; + } + + private static Logger logger() { + Logger logger = LoggerFactory.getLogger(Note.class); + return logger; + } + + @Override + public void onProgressUpdate(Job job, int progress) {} + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java new file mode 100644 index 0000000..b1fd7b9 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterSetting; + +/** + * Repl loader per note. + */ +public class NoteInterpreterLoader { + private transient InterpreterFactory factory; + String noteId; + + public NoteInterpreterLoader(InterpreterFactory factory) { + this.factory = factory; + } + + public void setNoteId(String noteId) { + this.noteId = noteId; + } + + /** + * set interpreter ids + * @param ids InterpreterSetting id list + * @throws IOException + */ + public void setInterpreters(List<String> ids) throws IOException { + factory.putNoteInterpreterSettingBinding(noteId, ids); + } + + public List<String> getInterpreters() { + return factory.getNoteInterpreterSettingBinding(noteId); + } + + public List<InterpreterSetting> getInterpreterSettings() { + List<String> interpreterSettingIds = factory.getNoteInterpreterSettingBinding(noteId); + LinkedList<InterpreterSetting> settings = new LinkedList<InterpreterSetting>(); + synchronized (interpreterSettingIds) { + for (String id : interpreterSettingIds) { + InterpreterSetting setting = factory.get(id); + if (setting == null) { + // interpreter setting is removed from factory. remove id from here, too + interpreterSettingIds.remove(id); + } else { + settings.add(setting); + } + } + } + return settings; + } + + public Interpreter get(String replName) { + List<InterpreterSetting> settings = getInterpreterSettings(); + + if (settings == null || settings.size() == 0) { + return null; + } + + if (replName == null) { + return settings.get(0).getInterpreterGroup().getFirst(); + } + + if (Interpreter.registeredInterpreters == null) { + return null; + } + Interpreter.RegisteredInterpreter registeredInterpreter + = Interpreter.registeredInterpreters.get(replName); + if (registeredInterpreter == null || registeredInterpreter.getClassName() == null) { + throw new InterpreterException(replName + " interpreter not found"); + } + String interpreterClassName = registeredInterpreter.getClassName(); + + for (InterpreterSetting setting : settings) { + InterpreterGroup intpGroup = setting.getInterpreterGroup(); + for (Interpreter interpreter : intpGroup) { + if (interpreterClassName.equals(interpreter.getClassName())) { + return interpreter; + } + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java new file mode 100644 index 0000000..2d9ba36 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.interpreter.InterpreterFactory; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Collection of Notes. + */ +public class Notebook { + Logger logger = LoggerFactory.getLogger(Notebook.class); + private SchedulerFactory schedulerFactory; + private InterpreterFactory replFactory; + /** Keep the order. */ + Map<String, Note> notes = new LinkedHashMap<String, Note>(); + private ZeppelinConfiguration conf; + private StdSchedulerFactory quertzSchedFact; + private org.quartz.Scheduler quartzSched; + private JobListenerFactory jobListenerFactory; + + public Notebook(ZeppelinConfiguration conf, SchedulerFactory schedulerFactory, + InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException, + SchedulerException { + this.conf = conf; + this.schedulerFactory = schedulerFactory; + this.replFactory = replFactory; + this.jobListenerFactory = jobListenerFactory; + quertzSchedFact = new org.quartz.impl.StdSchedulerFactory(); + quartzSched = quertzSchedFact.getScheduler(); + quartzSched.start(); + CronJob.notebook = this; + + loadAllNotes(); + } + + /** + * Create new note. + * + * @return + * @throws IOException + */ + public Note createNote() throws IOException { + if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) { + return createNote(replFactory.getDefaultInterpreterSettingList()); + } else { + return createNote(null); + } + } + + /** + * Create new note. + * + * @return + * @throws IOException + */ + public Note createNote(List<String> interpreterIds) throws IOException { + NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory); + Note note = new Note(conf, intpLoader, jobListenerFactory, quartzSched); + intpLoader.setNoteId(note.id()); + synchronized (notes) { + notes.put(note.id(), note); + } + if (interpreterIds != null) { + bindInterpretersToNote(note.id(), interpreterIds); + } + + return note; + } + + public void bindInterpretersToNote(String id, + List<String> interpreterSettingIds) throws IOException { + Note note = getNote(id); + if (note != null) { + note.getNoteReplLoader().setInterpreters(interpreterSettingIds); + replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); + } + } + + public List<String> getBindedInterpreterSettingsIds(String id) { + Note note = getNote(id); + if (note != null) { + return note.getNoteReplLoader().getInterpreters(); + } else { + return new LinkedList<String>(); + } + } + + public List<InterpreterSetting> getBindedInterpreterSettings(String id) { + Note note = getNote(id); + if (note != null) { + return note.getNoteReplLoader().getInterpreterSettings(); + } else { + return new LinkedList<InterpreterSetting>(); + } + } + + public Note getNote(String id) { + synchronized (notes) { + return notes.get(id); + } + } + + public void removeNote(String id) { + Note note; + synchronized (notes) { + note = notes.remove(id); + } + try { + note.unpersist(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void loadAllNotes() throws IOException { + File notebookDir = new File(conf.getNotebookDir()); + File[] dirs = notebookDir.listFiles(); + if (dirs == null) { + return; + } + for (File f : dirs) { + boolean isHidden = f.getName().startsWith("."); + if (f.isDirectory() && !isHidden) { + Scheduler scheduler = + schedulerFactory.createOrGetFIFOScheduler("note_" + System.currentTimeMillis()); + logger.info("Loading note from " + f.getName()); + NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(replFactory); + Note note = Note.load(f.getName(), + conf, + noteInterpreterLoader, + scheduler, + jobListenerFactory, quartzSched); + noteInterpreterLoader.setNoteId(note.id()); + + synchronized (notes) { + notes.put(note.id(), note); + refreshCron(note.id()); + } + } + } + } + + public List<Note> getAllNotes() { + synchronized (notes) { + List<Note> noteList = new ArrayList<Note>(notes.values()); + logger.info("" + noteList.size()); + Collections.sort(noteList, new Comparator() { + @Override + public int compare(Object one, Object two) { + Note note1 = (Note) one; + Note note2 = (Note) two; + + String name1 = note1.id(); + if (note1.getName() != null) { + name1 = note1.getName(); + } + String name2 = note2.id(); + if (note2.getName() != null) { + name2 = note2.getName(); + } + ((Note) one).getName(); + return name1.compareTo(name2); + } + }); + return noteList; + } + } + + public JobListenerFactory getJobListenerFactory() { + return jobListenerFactory; + } + + public void setJobListenerFactory(JobListenerFactory jobListenerFactory) { + this.jobListenerFactory = jobListenerFactory; + } + + /** + * Cron task for the note. + * + * @author Leemoonsoo + * + */ + public static class CronJob implements org.quartz.Job { + public static Notebook notebook; + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + + String noteId = context.getJobDetail().getJobDataMap().getString("noteId"); + Note note = notebook.getNote(noteId); + note.runAll(); + } + } + + public void refreshCron(String id) { + removeCron(id); + synchronized (notes) { + + Note note = notes.get(id); + if (note == null) { + return; + } + Map<String, Object> config = note.getConfig(); + if (config == null) { + return; + } + + String cronExpr = (String) note.getConfig().get("cron"); + if (cronExpr == null || cronExpr.trim().length() == 0) { + return; + } + + + JobDetail newJob = + JobBuilder.newJob(CronJob.class).withIdentity(id, "note").usingJobData("noteId", id) + .build(); + + Map<String, Object> info = note.getInfo(); + info.put("cron", null); + + CronTrigger trigger = null; + try { + trigger = + TriggerBuilder.newTrigger().withIdentity("trigger_" + id, "note") + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, "note") + .build(); + } catch (Exception e) { + logger.error("Error", e); + info.put("cron", e.getMessage()); + } + + + try { + if (trigger != null) { + quartzSched.scheduleJob(newJob, trigger); + } + } catch (SchedulerException e) { + logger.error("Error", e); + info.put("cron", "Scheduler Exception"); + } + } + } + + private void removeCron(String id) { + try { + quartzSched.deleteJob(new JobKey(id, "note")); + } catch (SchedulerException e) { + logger.error("Can't remove quertz " + id, e); + } + } + + public InterpreterFactory getInterpreterFactory() { + return replFactory; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java new file mode 100644 index 0000000..e0986bf --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.JobListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Paragraph is a representation of an execution unit. + * + * @author Leemoonsoo + */ +public class Paragraph extends Job implements Serializable { + private static final transient long serialVersionUID = -6328572073497992016L; + private transient NoteInterpreterLoader replLoader; + + String title; + String text; + private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc + public final GUI settings; // form and parameter settings + + public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) { + super(generateId(), listener); + this.replLoader = replLoader; + title = null; + text = null; + settings = new GUI(); + config = new HashMap<String, Object>(); + } + + private static String generateId() { + return "paragraph_" + System.currentTimeMillis() + "_" + + new Random(System.currentTimeMillis()).nextInt(); + } + + public String getText() { + return text; + } + + public void setText(String newText) { + this.text = newText; + } + + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getRequiredReplName() { + return getRequiredReplName(text); + } + + public static String getRequiredReplName(String text) { + if (text == null) { + return null; + } + + // get script head + int scriptHeadIndex = 0; + for (int i = 0; i < text.length(); i++) { + char ch = text.charAt(i); + if (ch == ' ' || ch == '\n') { + scriptHeadIndex = i; + break; + } + } + if (scriptHeadIndex == 0) { + return null; + } + String head = text.substring(0, scriptHeadIndex); + if (head.startsWith("%")) { + return head.substring(1); + } else { + return null; + } + } + + private String getScriptBody() { + return getScriptBody(text); + } + + public static String getScriptBody(String text) { + if (text == null) { + return null; + } + + String magic = getRequiredReplName(text); + if (magic == null) { + return text; + } + if (magic.length() + 2 >= text.length()) { + return ""; + } + return text.substring(magic.length() + 2); + } + + public NoteInterpreterLoader getNoteReplLoader() { + return replLoader; + } + + public Interpreter getRepl(String name) { + return replLoader.get(name); + } + + public List<String> completion(String buffer, int cursor) { + String replName = getRequiredReplName(buffer); + if (replName != null) { + cursor -= replName.length() + 1; + } + String body = getScriptBody(buffer); + Interpreter repl = getRepl(replName); + if (repl == null) { + return null; + } + + return repl.completion(body, cursor); + } + + public void setNoteReplLoader(NoteInterpreterLoader repls) { + this.replLoader = repls; + } + + public InterpreterResult getResult() { + return (InterpreterResult) getReturn(); + } + + @Override + public int progress() { + String replName = getRequiredReplName(); + Interpreter repl = getRepl(replName); + if (repl != null) { + return repl.getProgress(getInterpreterContext()); + } else { + return 0; + } + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + String replName = getRequiredReplName(); + Interpreter repl = getRepl(replName); + logger().info("run paragraph {} using {} " + repl, getId(), replName); + if (repl == null) { + logger().error("Can not find interpreter name " + repl); + throw new RuntimeException("Can not find interpreter for " + getRequiredReplName()); + } + + String script = getScriptBody(); + // inject form + if (repl.getFormType() == FormType.NATIVE) { + settings.clear(); + } else if (repl.getFormType() == FormType.SIMPLE) { + String scriptBody = getScriptBody(); + Map<String, Input> inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built + // from script body + settings.setForms(inputs); + script = Input.getSimpleQuery(settings.getParams(), scriptBody); + } + logger().info("RUN : " + script); + InterpreterResult ret = repl.interpret(script, getInterpreterContext()); + return ret; + } + + @Override + protected boolean jobAbort() { + Interpreter repl = getRepl(getRequiredReplName()); + repl.cancel(getInterpreterContext()); + return true; + } + + private InterpreterContext getInterpreterContext() { + InterpreterContext interpreterContext = new InterpreterContext(getId(), + this.getTitle(), + this.getText(), + this.getConfig(), + this.settings); + return interpreterContext; + } + + private Logger logger() { + Logger logger = LoggerFactory.getLogger(Paragraph.class); + return logger; + } + + + public Map<String, Object> getConfig() { + return config; + } + + public void setConfig(Map<String, Object> config) { + this.config = config; + } + + public void setReturn(InterpreterResult value, Throwable t) { + setResult(value); + setException(t); + + } +}
