http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index de4bbc8..d824dfe 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -1,15 +1,32 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>zeppelin</artifactId> - <groupId>com.nflabs.zeppelin</groupId> + <groupId>org.apache.zeppelin</groupId> <version>0.5.0-SNAPSHOT</version> </parent> - <groupId>com.nflabs.zeppelin</groupId> + <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-interpreter</artifactId> <packaging>jar</packaging> <version>0.5.0-SNAPSHOT</version>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/GUI.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/GUI.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/GUI.java deleted file mode 100644 index 51ae222..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/GUI.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.nflabs.zeppelin.display; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -import com.nflabs.zeppelin.display.Input.ParamOption; - -/** - * Settings of a form. - * - * @author Leemoonsoo - * - */ -public class GUI implements Serializable { - - Map<String, Object> params = new HashMap<String, Object>(); // form parameters from client - Map<String, Input> forms = new TreeMap<String, Input>(); // form configuration - - public GUI() { - - } - - public void setParams(Map<String, Object> values) { - this.params = values; - } - - public Map<String, Object> getParams() { - return params; - } - - public Map<String, Input> getForms() { - return forms; - } - - public void setForms(Map<String, Input> forms) { - this.forms = forms; - } - - public Object input(String id, Object defaultValue) { - // first find values from client and then use default - Object value = params.get(id); - if (value == null) { - value = defaultValue; - } - - forms.put(id, new Input(id, defaultValue)); - return value; - } - - public Object input(String id) { - return input(id, ""); - } - - public Object select(String id, Object defaultValue, ParamOption[] options) { - Object value = params.get(id); - if (value == null) { - value = defaultValue; - } - forms.put(id, new Input(id, defaultValue, options)); - return value; - } - - public void clear() { - this.forms = new TreeMap<String, Input>(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/Input.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/Input.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/Input.java deleted file mode 100644 index 54ef717..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/display/Input.java +++ /dev/null @@ -1,458 +0,0 @@ -package com.nflabs.zeppelin.display; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Input type. - * - * @author Leemoonsoo - * - */ -public class Input implements Serializable { - /** - * Parameters option. - * - * @author Leemoonsoo - * - */ - public static class ParamOption { - Object value; - String displayName; - - public ParamOption(Object value, String displayName) { - super(); - this.value = value; - this.displayName = displayName; - } - - public Object getValue() { - return value; - } - - public void setValue(Object value) { - this.value = value; - } - - public String getDisplayName() { - return displayName; - } - - public void setDisplayName(String displayName) { - this.displayName = displayName; - } - - } - - String name; - String displayName; - String type; - Object defaultValue; - ParamOption[] options; - boolean hidden; - - public Input(String name, Object defaultValue) { - this.name = name; - this.displayName = name; - this.defaultValue = defaultValue; - } - - public Input(String name, Object defaultValue, ParamOption[] options) { - this.name = name; - this.displayName = name; - this.defaultValue = defaultValue; - this.options = options; - } - - - public Input(String name, String displayName, String type, Object defaultValue, - ParamOption[] options, boolean hidden) { - super(); - this.name = name; - this.displayName = displayName; - this.type = type; - this.defaultValue = defaultValue; - this.options = options; - this.hidden = hidden; - } - - public boolean equals(Object o) { - return name.equals(((Input) o).getName()); - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getDisplayName() { - return displayName; - } - - public void setDisplayName(String displayName) { - this.displayName = displayName; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public Object getDefaultValue() { - return defaultValue; - } - - public void setDefaultValue(Object defaultValue) { - this.defaultValue = defaultValue; - } - - public ParamOption[] getOptions() { - return options; - } - - public void setOptions(ParamOption[] options) { - this.options = options; - } - - public boolean isHidden() { - return hidden; - } - - - private static String[] getNameAndDisplayName(String str) { - Pattern p = Pattern.compile("([^(]*)\\s*[(]([^)]*)[)]"); - Matcher m = p.matcher(str.trim()); - if (m == null || m.find() == false) { - return null; - } - String[] ret = new String[2]; - ret[0] = m.group(1); - ret[1] = m.group(2); - return ret; - } - - private static String[] getType(String str) { - Pattern p = Pattern.compile("([^:]*)\\s*:\\s*(.*)"); - Matcher m = p.matcher(str.trim()); - if (m == null || m.find() == false) { - return null; - } - String[] ret = new String[2]; - ret[0] = m.group(1).trim(); - ret[1] = m.group(2).trim(); - return ret; - } - - public static Map<String, Input> extractSimpleQueryParam(String script) { - Map<String, Input> params = new HashMap<String, Input>(); - if (script == null) { - return params; - } - String replaced = script; - - Pattern pattern = Pattern.compile("([_])?[$][{]([^=}]*([=][^}]*)?)[}]"); - - Matcher match = pattern.matcher(replaced); - while (match.find()) { - String hiddenPart = match.group(1); - boolean hidden = false; - if ("_".equals(hiddenPart)) { - hidden = true; - } - String m = match.group(2); - - String namePart; - String valuePart; - - int p = m.indexOf('='); - if (p > 0) { - namePart = m.substring(0, p); - valuePart = m.substring(p + 1); - } else { - namePart = m; - valuePart = null; - } - - - String varName; - String displayName = null; - String type = null; - String defaultValue = ""; - ParamOption[] paramOptions = null; - - // get var name type - String varNamePart; - String[] typeArray = getType(namePart); - if (typeArray != null) { - type = typeArray[0]; - varNamePart = typeArray[1]; - } else { - varNamePart = namePart; - } - - // get var name and displayname - String[] varNameArray = getNameAndDisplayName(varNamePart); - if (varNameArray != null) { - varName = varNameArray[0]; - displayName = varNameArray[1]; - } else { - varName = varNamePart.trim(); - } - - // get defaultValue - if (valuePart != null) { - // find default value - int optionP = valuePart.indexOf(","); - if (optionP > 0) { // option available - defaultValue = valuePart.substring(0, optionP); - String optionPart = valuePart.substring(optionP + 1); - String[] options = Input.splitPipe(optionPart); - - paramOptions = new ParamOption[options.length]; - - for (int i = 0; i < options.length; i++) { - - String[] optNameArray = getNameAndDisplayName(options[i]); - if (optNameArray != null) { - paramOptions[i] = new ParamOption(optNameArray[0], optNameArray[1]); - } else { - paramOptions[i] = new ParamOption(options[i], null); - } - } - - - } else { // no option - defaultValue = valuePart; - } - - } - - Input param = new Input(varName, displayName, type, defaultValue, paramOptions, hidden); - params.put(varName, param); - } - - params.remove("pql"); - return params; - } - - public static String getSimpleQuery(Map<String, Object> params, String script) { - String replaced = script; - - for (String key : params.keySet()) { - Object value = params.get(key); - replaced = - replaced.replaceAll("[_]?[$][{]([^:]*[:])?" + key + "([(][^)]*[)])?(=[^}]*)?[}]", - value.toString()); - } - - Pattern pattern = Pattern.compile("[$][{]([^=}]*[=][^}]*)[}]"); - while (true) { - Matcher match = pattern.matcher(replaced); - if (match != null && match.find()) { - String m = match.group(1); - int p = m.indexOf('='); - String replacement = m.substring(p + 1); - int optionP = replacement.indexOf(","); - if (optionP > 0) { - replacement = replacement.substring(0, optionP); - } - replaced = - replaced.replaceFirst("[_]?[$][{]" - + m.replaceAll("[(]", ".").replaceAll("[)]", ".").replaceAll("[|]", ".") + "[}]", - replacement); - } else { - break; - } - } - - replaced = replaced.replace("[_]?[$][{]([^=}]*)[}]", ""); - return replaced; - } - - - public static String[] split(String str) { - return str.split(";(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); - - } - - /* - * public static String [] splitPipe(String str){ //return - * str.split("\\|(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); return - * str.split("\\|(?=([^\"']*\"[^\"']*\")*[^\"']*$)"); } - */ - - - public static String[] splitPipe(String str) { - return split(str, '|'); - } - - public static String[] split(String str, char split) { - return split(str, new String[] {String.valueOf(split)}, false); - } - - public static String[] split(String str, String[] splitters, boolean includeSplitter) { - String escapeSeq = "\"',;${}"; - char escapeChar = '\\'; - - String[] blockStart = new String[] {"\"", "'", "${", "N_(", "N_<"}; - String[] blockEnd = new String[] {"\"", "'", "}", "N_)", "N_>"}; - - return split(str, escapeSeq, escapeChar, blockStart, blockEnd, splitters, includeSplitter); - - } - - public static String[] split(String str, String escapeSeq, char escapeChar, String[] blockStart, - String[] blockEnd, String[] splitters, boolean includeSplitter) { - - List<String> splits = new ArrayList<String>(); - - String curString = ""; - - boolean escape = false; // true when escape char is found - int lastEscapeOffset = -1; - int blockStartPos = -1; - List<Integer> blockStack = new LinkedList<Integer>(); - - for (int i = 0; i < str.length(); i++) { - char c = str.charAt(i); - - // escape char detected - if (c == escapeChar && escape == false) { - escape = true; - continue; - } - - // escaped char comes - if (escape == true) { - if (escapeSeq.indexOf(c) < 0) { - curString += escapeChar; - } - curString += c; - escape = false; - lastEscapeOffset = curString.length(); - continue; - } - - if (blockStack.size() > 0) { // inside of block - curString += c; - // check multichar block - boolean multicharBlockDetected = false; - for (int b = 0; b < blockStart.length; b++) { - if (blockStartPos >= 0 - && getBlockStr(blockStart[b]).compareTo(str.substring(blockStartPos, i)) == 0) { - blockStack.remove(0); - blockStack.add(0, b); - multicharBlockDetected = true; - break; - } - } - - if (multicharBlockDetected == true) { - continue; - } - - // check if current block is nestable - if (isNestedBlock(blockStart[blockStack.get(0)]) == true) { - // try to find nested block start - - if (curString.substring(lastEscapeOffset + 1).endsWith( - getBlockStr(blockStart[blockStack.get(0)])) == true) { - blockStack.add(0, blockStack.get(0)); // block is started - blockStartPos = i; - continue; - } - } - - // check if block is finishing - if (curString.substring(lastEscapeOffset + 1).endsWith( - getBlockStr(blockEnd[blockStack.get(0)]))) { - // the block closer is one of the splitters (and not nested block) - if (isNestedBlock(blockEnd[blockStack.get(0)]) == false) { - for (String splitter : splitters) { - if (splitter.compareTo(getBlockStr(blockEnd[blockStack.get(0)])) == 0) { - splits.add(curString); - if (includeSplitter == true) { - splits.add(splitter); - } - curString = ""; - lastEscapeOffset = -1; - - break; - } - } - } - blockStartPos = -1; - blockStack.remove(0); - continue; - } - - } else { // not in the block - boolean splitted = false; - for (String splitter : splitters) { - // forward check for splitter - int curentLenght = i + splitter.length(); - if (splitter.compareTo(str.substring(i, Math.min(curentLenght, str.length()))) == 0) { - splits.add(curString); - if (includeSplitter == true) { - splits.add(splitter); - } - curString = ""; - lastEscapeOffset = -1; - i += splitter.length() - 1; - splitted = true; - break; - } - } - if (splitted == true) { - continue; - } - - // add char to current string - curString += c; - - // check if block is started - for (int b = 0; b < blockStart.length; b++) { - if (curString.substring(lastEscapeOffset + 1) - .endsWith(getBlockStr(blockStart[b])) == true) { - blockStack.add(0, b); // block is started - blockStartPos = i; - break; - } - } - } - } - if (curString.length() > 0) { - splits.add(curString.trim()); - } - return splits.toArray(new String[] {}); - - } - - private static String getBlockStr(String blockDef) { - if (blockDef.startsWith("N_")) { - return blockDef.substring("N_".length()); - } else { - return blockDef; - } - } - - private static boolean isNestedBlock(String blockDef) { - if (blockDef.startsWith("N_")) { - return true; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/ClassloaderInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/ClassloaderInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/ClassloaderInterpreter.java deleted file mode 100644 index f8d8bbf..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/ClassloaderInterpreter.java +++ /dev/null @@ -1,261 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -import java.net.URL; -import java.util.List; -import java.util.Properties; - -import com.nflabs.zeppelin.scheduler.Scheduler; - -/** - * Add to the classpath interpreters. - * - */ -public class ClassloaderInterpreter - extends Interpreter - implements WrappedInterpreter { - - private ClassLoader cl; - private Interpreter intp; - - public ClassloaderInterpreter(Interpreter intp, ClassLoader cl) { - super(new Properties()); - this.cl = cl; - this.intp = intp; - } - - @Override - public Interpreter getInnerInterpreter() { - return intp; - } - - public ClassLoader getClassloader() { - return cl; - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.interpret(st, context); - } catch (Exception e) { - e.printStackTrace(); - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - - @Override - public void open() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - intp.open(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public void close() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - intp.close(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public void cancel(InterpreterContext context) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - intp.cancel(context); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public FormType getFormType() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getFormType(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public int getProgress(InterpreterContext context) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getProgress(context); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public Scheduler getScheduler() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getScheduler(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public List<String> completion(String buf, int cursor) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.completion(buf, cursor); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - - @Override - public String getClassName() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getClassName(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - intp.setInterpreterGroup(interpreterGroup); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public InterpreterGroup getInterpreterGroup() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getInterpreterGroup(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public void setClassloaderUrls(URL [] urls) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - intp.setClassloaderUrls(urls); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public URL [] getClassloaderUrls() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getClassloaderUrls(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public void setProperty(Properties property) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - intp.setProperty(property); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public Properties getProperty() { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getProperty(); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } - - @Override - public String getProperty(String key) { - ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(cl); - try { - return intp.getProperty(key); - } catch (Exception e) { - throw new InterpreterException(e); - } finally { - cl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(oldcl); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/Interpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/Interpreter.java deleted file mode 100644 index acb62a2..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/Interpreter.java +++ /dev/null @@ -1,267 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - - -import java.net.URL; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.nflabs.zeppelin.scheduler.Scheduler; -import com.nflabs.zeppelin.scheduler.SchedulerFactory; - -/** - * Interface for interpreters. - * If you want to implement new Zeppelin interpreter, extend this class - * - * Please see, - * http://zeppelin.incubator.apache.org/docs/development/writingzeppelininterpreter.html - * - * open(), close(), interpreter() is three the most important method you need to implement. - * cancel(), getProgress(), completion() is good to have - * getFormType(), getScheduler() determine Zeppelin's behavior - * - */ -public abstract class Interpreter { - - /** - * Opens interpreter. You may want to place your initialize routine here. - * open() is called only once - */ - public abstract void open(); - - /** - * Closes interpreter. You may want to free your resources up here. - * close() is called only once - */ - public abstract void close(); - - /** - * Run code and return result, in synchronous way. - * - * @param st statements to run - * @param context - * @return - */ - public abstract InterpreterResult interpret(String st, InterpreterContext context); - - /** - * Optionally implement the canceling routine to abort interpret() method - * - * @param context - */ - public abstract void cancel(InterpreterContext context); - - /** - * Dynamic form handling - * see http://zeppelin.incubator.apache.org/docs/dynamicform.html - * - * @return FormType.SIMPLE enables simple pattern replacement (eg. Hello ${name=world}), - * FormType.NATIVE handles form in API - */ - public abstract FormType getFormType(); - - /** - * get interpret() method running process in percentage. - * - * @param context - * @return number between 0-100 - */ - public abstract int getProgress(InterpreterContext context); - - /** - * Get completion list based on cursor position. - * By implementing this method, it enables auto-completion. - * - * @param buf statements - * @param cursor cursor position in statements - * @return list of possible completion. Return empty list if there're nothing to return. - */ - public abstract List<String> completion(String buf, int cursor); - - /** - * Interpreter can implements it's own scheduler by overriding this method. - * There're two default scheduler provided, FIFO, Parallel. - * If your interpret() can handle concurrent request, use Parallel or use FIFO. - * - * You can get default scheduler by using - * SchedulerFactory.singleton().createOrGetFIFOScheduler() - * SchedulerFactory.singleton().createOrGetParallelScheduler() - * - * - * @return return scheduler instance. - * This method can be called multiple times and have to return the same instance. - * Can not return null. - */ - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler("interpreter_" + this.hashCode()); - } - - /** - * Called when interpreter is no longer used. - */ - public void destroy() { - getScheduler().stop(); - } - - - - - - static Logger logger = LoggerFactory.getLogger(Interpreter.class); - private InterpreterGroup interpreterGroup; - private URL [] classloaderUrls; - protected Properties property; - - public Interpreter(Properties property) { - this.property = property; - } - - public void setProperty(Properties property) { - this.property = property; - } - - public Properties getProperty() { - Properties p = new Properties(); - p.putAll(property); - - Map<String, InterpreterProperty> defaultProperties = Interpreter - .findRegisteredInterpreterByClassName(getClassName()).getProperties(); - for (String k : defaultProperties.keySet()) { - if (!p.contains(k)) { - String value = defaultProperties.get(k).getDefaultValue(); - if (value != null) { - p.put(k, defaultProperties.get(k).getDefaultValue()); - } - } - } - - return property; - } - - public String getProperty(String key) { - if (property.containsKey(key)) { - return property.getProperty(key); - } - - Map<String, InterpreterProperty> defaultProperties = Interpreter - .findRegisteredInterpreterByClassName(getClassName()).getProperties(); - if (defaultProperties.containsKey(key)) { - return defaultProperties.get(key).getDefaultValue(); - } - - return null; - } - - - public String getClassName() { - return this.getClass().getName(); - } - - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - this.interpreterGroup = interpreterGroup; - } - - public InterpreterGroup getInterpreterGroup() { - return this.interpreterGroup; - } - - public URL[] getClassloaderUrls() { - return classloaderUrls; - } - - public void setClassloaderUrls(URL[] classloaderUrls) { - this.classloaderUrls = classloaderUrls; - } - - - /** - * Type of interpreter. - */ - public static enum FormType { - NATIVE, SIMPLE, NONE - } - - /** - * Represent registered interpreter class - */ - public static class RegisteredInterpreter { - private String name; - private String group; - private String className; - private Map<String, InterpreterProperty> properties; - private String path; - - public RegisteredInterpreter(String name, String group, String className, - Map<String, InterpreterProperty> properties) { - super(); - this.name = name; - this.group = group; - this.className = className; - this.properties = properties; - } - - public String getName() { - return name; - } - - public String getGroup() { - return group; - } - - public String getClassName() { - return className; - } - - public Map<String, InterpreterProperty> getProperties() { - return properties; - } - - public void setPath(String path) { - this.path = path; - } - - public String getPath() { - return path; - } - - } - - /** - * Type of Scheduling. - */ - public static enum SchedulingMode { - FIFO, PARALLEL - } - - public static Map<String, RegisteredInterpreter> registeredInterpreters = Collections - .synchronizedMap(new HashMap<String, RegisteredInterpreter>()); - - public static void register(String name, String className) { - register(name, name, className); - } - - public static void register(String name, String group, String className) { - register(name, group, className, new HashMap<String, InterpreterProperty>()); - } - - public static void register(String name, String group, String className, - Map<String, InterpreterProperty> properties) { - registeredInterpreters.put(name, new RegisteredInterpreter(name, group, className, properties)); - } - - public static RegisteredInterpreter findRegisteredInterpreterByClassName(String className) { - for (RegisteredInterpreter ri : registeredInterpreters.values()) { - if (ri.getClassName().equals(className)) { - return ri; - } - } - return null; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterContext.java deleted file mode 100644 index d99e8b0..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterContext.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -import java.util.Map; - -import com.nflabs.zeppelin.display.GUI; - -/** - * Interpreter context - */ -public class InterpreterContext { - private final String paragraphTitle; - private final String paragraphId; - private final String paragraphText; - private final Map<String, Object> config; - private GUI gui; - - - public InterpreterContext(String paragraphId, - String paragraphTitle, - String paragraphText, - Map<String, Object> config, - GUI gui - ) { - this.paragraphId = paragraphId; - this.paragraphTitle = paragraphTitle; - this.paragraphText = paragraphText; - this.config = config; - this.gui = gui; - } - - public String getParagraphId() { - return paragraphId; - } - - public String getParagraphText() { - return paragraphText; - } - - public String getParagraphTitle() { - return paragraphTitle; - } - - public Map<String, Object> getConfig() { - return config; - } - - public GUI getGui() { - return gui; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterException.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterException.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterException.java deleted file mode 100644 index 8f50363..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterException.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -/** - * Runtime Exception for interpreters. - * - */ -public class InterpreterException extends RuntimeException { - - public InterpreterException(Throwable e) { - super(e); - } - - public InterpreterException(String m) { - super(m); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterGroup.java deleted file mode 100644 index ad2b348..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterGroup.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -import java.util.LinkedList; -import java.util.Properties; -import java.util.Random; - -/** - * InterpreterGroup is list of interpreters in the same group. - * And unit of interpreter instantiate, restart, bind, unbind. - */ -public class InterpreterGroup extends LinkedList<Interpreter>{ - String id; - - private static String generateId() { - return "InterpreterGroup_" + System.currentTimeMillis() + "_" - + new Random().nextInt(); - } - - public String getId() { - synchronized (this) { - if (id == null) { - id = generateId(); - } - return id; - } - } - - - public Properties getProperty() { - Properties p = new Properties(); - for (Interpreter intp : this) { - p.putAll(intp.getProperty()); - } - return p; - } - - public void close() { - for (Interpreter intp : this) { - intp.close(); - } - } - - public void destroy() { - for (Interpreter intp : this) { - intp.destroy(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterProperty.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterProperty.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterProperty.java deleted file mode 100644 index 63017e0..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterProperty.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -/** - * Represent property of interpreter - */ -public class InterpreterProperty { - String defaultValue; - String description; - - public InterpreterProperty(String defaultValue, - String description) { - super(); - this.defaultValue = defaultValue; - this.description = description; - } - - public String getDefaultValue() { - return defaultValue; - } - - public void setDefaultValue(String defaultValue) { - this.defaultValue = defaultValue; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterPropertyBuilder.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterPropertyBuilder.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterPropertyBuilder.java deleted file mode 100644 index 34aa51a..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterPropertyBuilder.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -import java.util.HashMap; -import java.util.Map; - -/** - * InterpreterPropertyBuilder - */ -public class InterpreterPropertyBuilder { - Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>(); - - public InterpreterPropertyBuilder add(String name, String defaultValue, String description){ - properties.put(name, new InterpreterProperty(defaultValue, description)); - return this; - } - - public Map<String, InterpreterProperty> build(){ - return properties; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterResult.java deleted file mode 100644 index 94bf673..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterResult.java +++ /dev/null @@ -1,120 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -import java.io.Serializable; - -/** - * Interpreter result template. - * - * @author Leemoonsoo - * - */ -public class InterpreterResult implements Serializable { - - /** - * Type of result after code execution. - * - * @author Leemoonsoo - * - */ - public static enum Code { - SUCCESS, - INCOMPLETE, - ERROR - } - - /** - * Type of Data. - * - * @author Leemoonsoo - * - */ - public static enum Type { - TEXT, - HTML, - TABLE, - IMG, - SVG, - NULL - } - - Code code; - Type type; - String msg; - - public InterpreterResult(Code code) { - this.code = code; - this.msg = null; - this.type = Type.TEXT; - } - - public InterpreterResult(Code code, String msg) { - this.code = code; - this.msg = getData(msg); - this.type = getType(msg); - } - - public InterpreterResult(Code code, Type type, String msg) { - this.code = code; - this.msg = msg; - this.type = type; - } - - /** - * Magic is like %html %text. - * - * @param msg - * @return - */ - private String getData(String msg) { - if (msg == null) { - return null; - } - - Type[] types = Type.values(); - for (Type t : types) { - String magic = "%" + t.name().toLowerCase(); - if (msg.startsWith(magic + " ") || msg.startsWith(magic + "\n")) { - int magicLength = magic.length() + 1; - if (msg.length() > magicLength) { - return msg.substring(magicLength); - } else { - return ""; - } - } - } - - return msg; - } - - - private Type getType(String msg) { - if (msg == null) { - return Type.TEXT; - } - Type[] types = Type.values(); - for (Type t : types) { - String magic = "%" + t.name().toLowerCase(); - if (msg.startsWith(magic + " ") || msg.startsWith(magic + "\n")) { - return t; - } - } - return Type.TEXT; - } - - public Code code() { - return code; - } - - public String message() { - return msg; - } - - public Type type() { - return type; - } - - public InterpreterResult type(Type type) { - this.type = type; - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterUtils.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterUtils.java deleted file mode 100644 index 37f9ff9..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/InterpreterUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nflabs.zeppelin.interpreter; - - -import java.lang.reflect.InvocationTargetException; - -/** - * Interpreter utility functions - */ -public class InterpreterUtils { - - public static String getMostRelevantMessage(Exception ex) { - if (ex instanceof InvocationTargetException) { - Throwable cause = ((InvocationTargetException) ex).getCause(); - if (cause != null) { - return cause.getMessage(); - } - } - String message = ex.getMessage(); - if (message == null || message == "") { - return ex.toString(); - } - return message; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java deleted file mode 100644 index 753adc9..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/LazyOpenInterpreter.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -import java.net.URL; -import java.util.List; -import java.util.Properties; - -import com.nflabs.zeppelin.scheduler.Scheduler; - -/** - * Interpreter wrapper for lazy initialization - */ -public class LazyOpenInterpreter - extends Interpreter - implements WrappedInterpreter { - private Interpreter intp; - boolean opened = false; - - public LazyOpenInterpreter(Interpreter intp) { - super(new Properties()); - this.intp = intp; - } - - @Override - public Interpreter getInnerInterpreter() { - return intp; - } - - @Override - public void setProperty(Properties property) { - intp.setProperty(property); - } - - @Override - public Properties getProperty() { - return intp.getProperty(); - } - - @Override - public String getProperty(String key) { - return intp.getProperty(key); - } - - @Override - public void open() { - if (opened == true) { - return; - } - - synchronized (intp) { - if (opened == false) { - intp.open(); - opened = true; - } - } - } - - @Override - public void close() { - synchronized (intp) { - if (opened == true) { - intp.close(); - opened = false; - } - } - } - - public boolean isOpen() { - synchronized (intp) { - return opened; - } - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - open(); - return intp.interpret(st, context); - } - - @Override - public void cancel(InterpreterContext context) { - open(); - intp.cancel(context); - } - - @Override - public FormType getFormType() { - return intp.getFormType(); - } - - @Override - public int getProgress(InterpreterContext context) { - open(); - return intp.getProgress(context); - } - - @Override - public Scheduler getScheduler() { - return intp.getScheduler(); - } - - @Override - public List<String> completion(String buf, int cursor) { - open(); - return intp.completion(buf, cursor); - } - - @Override - public String getClassName() { - return intp.getClassName(); - } - - @Override - public InterpreterGroup getInterpreterGroup() { - return intp.getInterpreterGroup(); - } - - @Override - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - intp.setInterpreterGroup(interpreterGroup); - } - - @Override - public URL [] getClassloaderUrls() { - return intp.getClassloaderUrls(); - } - - @Override - public void setClassloaderUrls(URL [] urls) { - intp.setClassloaderUrls(urls); - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/WrappedInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/WrappedInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/WrappedInterpreter.java deleted file mode 100644 index 47c71ff..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/WrappedInterpreter.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.nflabs.zeppelin.interpreter; - -/** - * WrappedInterpreter - */ -public interface WrappedInterpreter { - public Interpreter getInnerInterpreter(); -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/ClientFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/ClientFactory.java deleted file mode 100644 index 670dc2e..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/ClientFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.pool2.BasePooledObjectFactory; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransportException; - -import com.nflabs.zeppelin.interpreter.InterpreterException; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; - -/** - * - */ -public class ClientFactory extends BasePooledObjectFactory<Client>{ - private String host; - private int port; - Map<Client, TSocket> clientSocketMap = new HashMap<Client, TSocket>(); - - public ClientFactory(String host, int port) { - this.host = host; - this.port = port; - } - - @Override - public Client create() throws Exception { - TSocket transport = new TSocket(host, port); - try { - transport.open(); - } catch (TTransportException e) { - throw new InterpreterException(e); - } - - TProtocol protocol = new TBinaryProtocol(transport); - Client client = new RemoteInterpreterService.Client(protocol); - - synchronized (clientSocketMap) { - clientSocketMap.put(client, transport); - } - return client; - } - - @Override - public PooledObject<Client> wrap(Client client) { - return new DefaultPooledObject<Client>(client); - } - - @Override - public void destroyObject(PooledObject<Client> p) { - synchronized (clientSocketMap) { - if (clientSocketMap.containsKey(p)) { - clientSocketMap.get(p).close(); - clientSocketMap.remove(p); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java deleted file mode 100644 index ccae0f7..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreter.java +++ /dev/null @@ -1,330 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import com.nflabs.zeppelin.display.GUI; -import com.nflabs.zeppelin.interpreter.Interpreter; -import com.nflabs.zeppelin.interpreter.InterpreterContext; -import com.nflabs.zeppelin.interpreter.InterpreterException; -import com.nflabs.zeppelin.interpreter.InterpreterGroup; -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import com.nflabs.zeppelin.interpreter.InterpreterResult.Type; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterContext; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterResult; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import com.nflabs.zeppelin.scheduler.Scheduler; -import com.nflabs.zeppelin.scheduler.SchedulerFactory; - -/** - * - */ -public class RemoteInterpreter extends Interpreter { - Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); - Gson gson = new Gson(); - private String interpreterRunner; - private String interpreterPath; - private String className; - FormType formType; - boolean initialized; - private Map<String, String> env; - static Map<String, RemoteInterpreterProcess> interpreterGroupReference - = new HashMap<String, RemoteInterpreterProcess>(); - - public RemoteInterpreter(Properties property, - String className, - String interpreterRunner, - String interpreterPath) { - super(property); - - this.className = className; - initialized = false; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - env = new HashMap<String, String>(); - } - - public RemoteInterpreter(Properties property, - String className, - String interpreterRunner, - String interpreterPath, - Map<String, String> env) { - super(property); - - this.className = className; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.env = env; - } - - @Override - public String getClassName() { - return className; - } - - public RemoteInterpreterProcess getInterpreterProcess() { - synchronized (interpreterGroupReference) { - if (interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup()))) { - RemoteInterpreterProcess interpreterProcess = interpreterGroupReference - .get(getInterpreterGroupKey(getInterpreterGroup())); - try { - return interpreterProcess; - } catch (Exception e) { - throw new InterpreterException(e); - } - } else { - throw new InterpreterException("Unexpected error"); - } - } - } - - private synchronized void init() { - if (initialized == true) { - return; - } - - RemoteInterpreterProcess interpreterProcess = null; - - synchronized (interpreterGroupReference) { - if (interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup()))) { - interpreterProcess = interpreterGroupReference - .get(getInterpreterGroupKey(getInterpreterGroup())); - } else { - throw new InterpreterException("Unexpected error"); - } - } - - int rc = interpreterProcess.reference(); - - synchronized (interpreterProcess) { - // when first process created - if (rc == 1) { - // create all interpreter class in this interpreter group - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - for (Interpreter intp : this.getInterpreterGroup()) { - logger.info("Create remote interpreter {}", intp.getClassName()); - client.createInterpreter(intp.getClassName(), (Map) property); - - } - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - } - } - initialized = true; - } - - - - @Override - public void open() { - init(); - } - - @Override - public void close() { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - client.close(className); - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - - interpreterProcess.dereference(); - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - FormType form = getFormType(); - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - GUI settings = context.getGui(); - RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context)); - - Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( - remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { - }.getType()); - context.getConfig().clear(); - context.getConfig().putAll(remoteConfig); - - if (form == FormType.NATIVE) { - GUI remoteGui = gson.fromJson(remoteResult.getGui(), GUI.class); - context.getGui().clear(); - context.getGui().setParams(remoteGui.getParams()); - context.getGui().setForms(remoteGui.getForms()); - } - - return convert(remoteResult); - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - } - - @Override - public void cancel(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - client.cancel(className, convert(context)); - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - } - - - @Override - public FormType getFormType() { - init(); - - if (formType != null) { - return formType; - } - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - formType = FormType.valueOf(client.getFormType(className)); - return formType; - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - } - - @Override - public int getProgress(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - return client.getProgress(className, convert(context)); - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - } - - - @Override - public List<String> completion(String buf, int cursor) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - try { - return client.completion(className, buf, cursor); - } catch (TException e) { - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client); - } - } - - @Override - public Scheduler getScheduler() { - int maxConcurrency = 10; - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - "remoteinterpreter_" + interpreterProcess.hashCode(), - getInterpreterProcess(), - maxConcurrency); - } - - @Override - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - super.setInterpreterGroup(interpreterGroup); - - synchronized (interpreterGroupReference) { - if (!interpreterGroupReference - .containsKey(getInterpreterGroupKey(interpreterGroup))) { - interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup), - new RemoteInterpreterProcess(interpreterRunner, - interpreterPath, env)); - - logger.info("setInterpreterGroup = " - + getInterpreterGroupKey(interpreterGroup) + " class=" + className - + ", path=" + interpreterPath); - } - } - } - - private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { - return interpreterGroup.getId(); - } - - private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext( - ic.getParagraphId(), - ic.getParagraphTitle(), - ic.getParagraphText(), - gson.toJson(ic.getConfig()), - gson.toJson(ic.getGui())); - } - - private InterpreterResult convert(RemoteInterpreterResult result) { - return new InterpreterResult( - InterpreterResult.Code.valueOf(result.getCode()), - Type.valueOf(result.getType()), - result.getMsg()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcess.java deleted file mode 100644 index 3829618..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ /dev/null @@ -1,192 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.DefaultExecutor; -import org.apache.commons.exec.ExecuteException; -import org.apache.commons.exec.ExecuteResultHandler; -import org.apache.commons.exec.ExecuteWatchdog; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.nflabs.zeppelin.interpreter.InterpreterException; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; - -/** - * - */ -public class RemoteInterpreterProcess implements ExecuteResultHandler { - Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); - AtomicInteger referenceCount; - private DefaultExecutor executor; - private ExecuteWatchdog watchdog; - boolean running = false; - int port = -1; - private String interpreterRunner; - private String interpreterDir; - - private GenericObjectPool<Client> clientPool; - private Map<String, String> env; - - public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) { - this.interpreterRunner = intpRunner; - this.interpreterDir = intpDir; - this.env = env; - referenceCount = new AtomicInteger(0); - } - - public int getPort() { - return port; - } - - public int reference() { - synchronized (referenceCount) { - if (executor == null) { - // start server process - try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - - - CommandLine cmdLine = CommandLine.parse(interpreterRunner); - cmdLine.addArgument("-d", false); - cmdLine.addArgument(interpreterDir, false); - cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); - - executor = new DefaultExecutor(); - - watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); - executor.setWatchdog(watchdog); - - running = true; - try { - Map procEnv = EnvironmentUtils.getProcEnvironment(); - procEnv.putAll(env); - - logger.info("Run interpreter process {}", cmdLine); - executor.execute(cmdLine, procEnv, this); - } catch (IOException e) { - running = false; - throw new InterpreterException(e); - } - - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < 5 * 1000) { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } - } - - clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port)); - } - return referenceCount.incrementAndGet(); - } - } - - public Client getClient() throws Exception { - return clientPool.borrowObject(); - } - - public void releaseClient(Client client) { - clientPool.returnObject(client); - } - - public int dereference() { - synchronized (referenceCount) { - int r = referenceCount.decrementAndGet(); - if (r == 0) { - logger.info("shutdown interpreter process"); - // first try shutdown - try { - Client client = getClient(); - client.shutdown(); - releaseClient(client); - } catch (Exception e) { - logger.error("Error", e); - watchdog.destroyProcess(); - } - - clientPool.clear(); - clientPool.close(); - - // wait for 3 sec and force kill - // remote process server.serve() loop is not always finishing gracefully - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < 3 * 1000) { - if (this.isRunning()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - } else { - break; - } - } - - if (isRunning()) { - logger.info("kill interpreter process"); - watchdog.destroyProcess(); - } - - executor = null; - watchdog = null; - running = false; - logger.info("Remote process terminated"); - } - return r; - } - } - - public int referenceCount() { - synchronized (referenceCount) { - return referenceCount.get(); - } - } - - @Override - public void onProcessComplete(int exitValue) { - logger.info("Interpreter process exited {}", exitValue); - running = false; - - } - - @Override - public void onProcessFailed(ExecuteException e) { - logger.info("Interpreter process failed {}", e); - running = false; - } - - public boolean isRunning() { - return running; - } - - public int getNumActiveClient() { - if (clientPool == null) { - return 0; - } else { - return clientPool.getNumActive(); - } - } - - public int getNumIdleClient() { - if (clientPool == null) { - return 0; - } else { - return clientPool.getNumIdle(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java deleted file mode 100644 index 266d6fc..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ /dev/null @@ -1,325 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.thrift.TException; -import org.apache.thrift.server.TThreadPoolServer; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import com.nflabs.zeppelin.display.GUI; -import com.nflabs.zeppelin.interpreter.ClassloaderInterpreter; -import com.nflabs.zeppelin.interpreter.Interpreter; -import com.nflabs.zeppelin.interpreter.Interpreter.FormType; -import com.nflabs.zeppelin.interpreter.InterpreterContext; -import com.nflabs.zeppelin.interpreter.InterpreterException; -import com.nflabs.zeppelin.interpreter.InterpreterGroup; -import com.nflabs.zeppelin.interpreter.InterpreterResult; -import com.nflabs.zeppelin.interpreter.LazyOpenInterpreter; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterContext; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterResult; -import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService; -import com.nflabs.zeppelin.scheduler.Job; -import com.nflabs.zeppelin.scheduler.Job.Status; -import com.nflabs.zeppelin.scheduler.JobListener; -import com.nflabs.zeppelin.scheduler.JobProgressPoller; -import com.nflabs.zeppelin.scheduler.Scheduler; - - -/** - * - */ -public class RemoteInterpreterServer - extends Thread - implements RemoteInterpreterService.Iface { - Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); - - InterpreterGroup interpreterGroup = new InterpreterGroup(); - Gson gson = new Gson(); - - RemoteInterpreterService.Processor<RemoteInterpreterServer> processor; - RemoteInterpreterServer handler; - private int port; - private TThreadPoolServer server; - - public RemoteInterpreterServer(int port) throws TTransportException { - this.port = port; - processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this); - TServerSocket serverTransport = new TServerSocket(port); - server = new TThreadPoolServer( - new TThreadPoolServer.Args(serverTransport).processor(processor)); - } - - @Override - public void run() { - logger.info("Starting remote interpreter server on port {}", port); - server.serve(); - } - - @Override - public void shutdown() throws TException { - // server.stop() does not always finish server.serve() loop - // sometimes server.serve() is hanging even after server.stop() call. - // this case, need to force kill the process - server.stop(); - } - - public int getPort() { - return port; - } - - public boolean isRunning() { - if (server == null) { - return false; - } else { - return server.isServing(); - } - } - - - public static void main(String[] args) - throws TTransportException, InterruptedException { - int port = Integer.parseInt(args[0]); - RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port); - remoteInterpreterServer.start(); - remoteInterpreterServer.join(); - System.exit(0); - } - - - @Override - public void createInterpreter(String className, Map<String, String> properties) - throws TException { - try { - Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className); - Properties p = new Properties(); - p.putAll(properties); - - Constructor<Interpreter> constructor = - replClass.getConstructor(new Class[] {Properties.class}); - Interpreter repl = constructor.newInstance(p); - - ClassLoader cl = ClassLoader.getSystemClassLoader(); - repl.setClassloaderUrls(new URL[]{}); - - synchronized (interpreterGroup) { - interpreterGroup.add(new LazyOpenInterpreter( - new ClassloaderInterpreter(repl, cl))); - } - - logger.info("Instantiate interpreter {}", className); - repl.setInterpreterGroup(interpreterGroup); - } catch (ClassNotFoundException | NoSuchMethodException | SecurityException - | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - e.printStackTrace(); - throw new TException(e); - } - } - - private Interpreter getInterpreter(String className) throws TException { - synchronized (interpreterGroup) { - for (Interpreter inp : interpreterGroup) { - if (inp.getClassName().equals(className)) { - return inp; - } - } - } - throw new TException(new InterpreterException("Interpreter instance " - + className + " not found")); - } - - @Override - public void open(String className) throws TException { - Interpreter intp = getInterpreter(className); - intp.open(); - } - - @Override - public void close(String className) throws TException { - Interpreter intp = getInterpreter(className); - intp.close(); - } - - - @Override - public RemoteInterpreterResult interpret(String className, String st, - RemoteInterpreterContext interpreterContext) throws TException { - Interpreter intp = getInterpreter(className); - InterpreterContext context = convert(interpreterContext); - - Scheduler scheduler = intp.getScheduler(); - InterpretJobListener jobListener = new InterpretJobListener(); - InterpretJob job = new InterpretJob( - interpreterContext.getParagraphId(), - "remoteInterpretJob_" + System.currentTimeMillis(), - jobListener, - JobProgressPoller.DEFAULT_INTERVAL_MSEC, - intp, - st, - context); - - scheduler.submit(job); - - while (!job.isTerminated()) { - synchronized (jobListener) { - try { - jobListener.wait(1000); - } catch (InterruptedException e) { - } - } - } - - if (job.getStatus() == Status.ERROR) { - throw new TException(job.getException()); - } else { - if (intp.getFormType() == FormType.NATIVE) { - // serialize dynamic form - - } - - return convert((InterpreterResult) job.getReturn(), - context.getConfig(), - context.getGui()); - } - } - - class InterpretJobListener implements JobListener { - - @Override - public void onProgressUpdate(Job job, int progress) { - } - - @Override - public void beforeStatusChange(Job job, Status before, Status after) { - } - - @Override - public void afterStatusChange(Job job, Status before, Status after) { - synchronized (this) { - notifyAll(); - } - } - } - - class InterpretJob extends Job { - - private Interpreter interpreter; - private String script; - private InterpreterContext context; - - public InterpretJob( - String jobId, - String jobName, - JobListener listener, - long progressUpdateIntervalMsec, - Interpreter interpreter, - String script, - InterpreterContext context) { - super(jobId, jobName, listener, progressUpdateIntervalMsec); - this.interpreter = interpreter; - this.script = script; - this.context = context; - } - - @Override - public int progress() { - return 0; - } - - @Override - public Map<String, Object> info() { - return null; - } - - @Override - protected Object jobRun() throws Throwable { - InterpreterResult result = interpreter.interpret(script, context); - return result; - } - - @Override - protected boolean jobAbort() { - return false; - } - } - - - @Override - public void cancel(String className, RemoteInterpreterContext interpreterContext) - throws TException { - Interpreter intp = getInterpreter(className); - intp.cancel(convert(interpreterContext)); - } - - @Override - public int getProgress(String className, RemoteInterpreterContext interpreterContext) - throws TException { - Interpreter intp = getInterpreter(className); - return intp.getProgress(convert(interpreterContext)); - } - - - @Override - public String getFormType(String className) throws TException { - Interpreter intp = getInterpreter(className); - return intp.getFormType().toString(); - } - - @Override - public List<String> completion(String className, String buf, int cursor) throws TException { - Interpreter intp = getInterpreter(className); - return intp.completion(buf, cursor); - } - - private InterpreterContext convert(RemoteInterpreterContext ric) { - return new InterpreterContext( - ric.getParagraphId(), - ric.getParagraphTitle(), - ric.getParagraphText(), - (Map<String, Object>) gson.fromJson(ric.getConfig(), - new TypeToken<Map<String, Object>>() {}.getType()), - gson.fromJson(ric.getGui(), GUI.class)); - } - - private RemoteInterpreterResult convert(InterpreterResult result, - Map<String, Object> config, GUI gui) { - return new RemoteInterpreterResult( - result.code().name(), - result.type().name(), - result.message(), - gson.toJson(config), - gson.toJson(gui)); - } - - @Override - public String getStatus(String jobId) - throws TException { - synchronized (interpreterGroup) { - for (Interpreter intp : interpreterGroup) { - for (Job job : intp.getScheduler().getJobsRunning()) { - if (jobId.equals(job.getId())) { - return job.getStatus().name(); - } - } - - for (Job job : intp.getScheduler().getJobsWaiting()) { - if (jobId.equals(job.getId())) { - return job.getStatus().name(); - } - } - } - } - return "Unknown"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtils.java deleted file mode 100644 index 0c8a505..0000000 --- a/zeppelin-interpreter/src/main/java/com/nflabs/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.nflabs.zeppelin.interpreter.remote; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; - -/** - * - */ -public class RemoteInterpreterUtils { - public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException { - int port; - try (ServerSocket socket = new ServerSocket(0);) { - port = socket.getLocalPort(); - socket.close(); - } - return port; - } - - public static boolean checkIfRemoteEndpointAccessible(String host, int port) { - try { - Socket discover = new Socket(); - discover.setSoTimeout(1000); - discover.connect(new InetSocketAddress(host, port), 1000); - discover.close(); - return true; - } catch (IOException e) { - return false; - } - } -}
