http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index 0ac7116..924901b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -17,29 +17,28 @@ package org.apache.zeppelin.interpreter.remote; -import java.util.List; - -import org.apache.thrift.TException; +import com.google.gson.Gson; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; +import java.util.List; /** * Proxy for AngularObjectRegistry that exists in remote interpreter process */ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); - private InterpreterGroup interpreterGroup; + private ManagedInterpreterGroup interpreterGroup; public RemoteAngularObjectRegistry(String interpreterId, - AngularObjectRegistryListener listener, - InterpreterGroup interpreterGroup) { + AngularObjectRegistryListener listener, + ManagedInterpreterGroup interpreterGroup) { super(interpreterId, listener); this.interpreterGroup = interpreterGroup; } @@ -56,31 +55,29 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { * @param noteId * @return */ - public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String - paragraphId) { - Gson gson = new Gson(); + public AngularObject addAndNotifyRemoteProcess(final String name, + final Object o, + final String noteId, + final String paragraphId) { + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (!remoteInterpreterProcess.isRunning()) { return super.add(name, o, noteId, paragraphId, true); } - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); - return super.add(name, o, noteId, paragraphId, true); - } catch (TException e) { - broken = true; - logger.error("Error", e); - } catch (Exception e) { - logger.error("Error", e); - } finally { - if (client != null) { - remoteInterpreterProcess.releaseClient(client, broken); - } - } - return null; + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + Gson gson = new Gson(); + client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); + return null; + } + } + ); + + return super.add(name, o, noteId, paragraphId, true); + } /** @@ -91,30 +88,24 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { * @param paragraphId * @return */ - public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String - paragraphId) { + public AngularObject removeAndNotifyRemoteProcess(final String name, + final String noteId, + final String paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { return super.remove(name, noteId, paragraphId); } - - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - client.angularObjectRemove(name, noteId, paragraphId); - return super.remove(name, noteId, paragraphId); - } catch (TException e) { - broken = true; - logger.error("Error", e); - } catch (Exception e) { - logger.error("Error", e); - } finally { - if (client != null) { - remoteInterpreterProcess.releaseClient(client, broken); + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.angularObjectRemove(name, noteId, paragraphId); + return null; + } } - } - return null; + ); + + return super.remove(name, noteId, paragraphId); } public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 12e0caa..54bf9e1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -17,160 +17,68 @@ package org.apache.zeppelin.interpreter.remote; -import java.util.*; - -import org.apache.commons.lang3.StringUtils; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; +import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.RemoteScheduler; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; /** * Proxy for Interpreter instance that runs on separate process */ public class RemoteInterpreter extends Interpreter { - private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); - - private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; - private final ApplicationEventListener applicationEventListener; - private Gson gson = new Gson(); - private String interpreterRunner; - private String interpreterPath; - private String localRepoPath; + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class); + private static final Gson gson = new Gson(); + + private String className; - private String sessionKey; - private FormType formType; - private boolean initialized; - private Map<String, String> env; - private int connectTimeout; - private int maxPoolSize; - private String host; - private int port; + private String sessionId; private String userName; - private Boolean isUserImpersonate; - private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; - private String interpreterGroupName; - - /** - * Remote interpreter and manage interpreter process - */ - public RemoteInterpreter(Properties property, String sessionKey, String className, - String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, - int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit, String interpreterGroupName) { - super(property); - this.sessionKey = sessionKey; - this.className = className; - initialized = false; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.localRepoPath = localRepoPath; - env = getEnvFromInterpreterProperty(property); - this.connectTimeout = connectTimeout; - this.maxPoolSize = maxPoolSize; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - this.outputLimit = outputLimit; - this.interpreterGroupName = interpreterGroupName; - } + private FormType formType; + private RemoteInterpreterProcess interpreterProcess; + private volatile boolean isOpened = false; + private volatile boolean isCreated = false; /** - * Connect to existing process + * Remote interpreter and manage interpreter process */ - public RemoteInterpreter(Properties property, String sessionKey, String className, String host, - int port, String localRepoPath, int connectTimeout, int maxPoolSize, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit) { - super(property); - this.sessionKey = sessionKey; + public RemoteInterpreter(Properties properties, + String sessionId, + String className, + String userName) { + super(properties); + this.sessionId = sessionId; this.className = className; - initialized = false; - this.host = host; - this.port = port; - this.localRepoPath = localRepoPath; - this.connectTimeout = connectTimeout; - this.maxPoolSize = maxPoolSize; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - this.outputLimit = outputLimit; - } - - - // VisibleForTesting - public RemoteInterpreter(Properties property, String sessionKey, String className, - String interpreterRunner, String interpreterPath, String localRepoPath, - Map<String, String> env, int connectTimeout, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { - super(property); - this.className = className; - this.sessionKey = sessionKey; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.localRepoPath = localRepoPath; - env.putAll(getEnvFromInterpreterProperty(property)); - this.env = env; - this.connectTimeout = connectTimeout; - this.maxPoolSize = 10; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - } - - private Map<String, String> getEnvFromInterpreterProperty(Properties property) { - Map<String, String> env = new HashMap<String, String>(); - StringBuilder sparkConfBuilder = new StringBuilder(); - for (String key : property.stringPropertyNames()) { - if (RemoteInterpreterUtils.isEnvString(key)) { - env.put(key, property.getProperty(key)); - } - if (key.equals("master")) { - sparkConfBuilder.append(" --master " + property.getProperty("master")); - } - if (isSparkConf(key, property.getProperty(key))) { - sparkConfBuilder.append(" --conf " + key + "=" + - toShellFormat(property.getProperty(key))); - } - } - env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); - return env; } - private String toShellFormat(String value) { - if (value.contains("\'") && value.contains("\"")) { - throw new RuntimeException("Spark property value could not contain both \" and '"); - } else if (value.contains("\'")) { - return "\"" + value + "\""; - } else { - return "\'" + value + "\'"; - } - } - - static boolean isSparkConf(String key, String value) { - return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + public boolean isOpened() { + return isOpened; } @Override @@ -178,202 +86,113 @@ public class RemoteInterpreter extends Interpreter { return className; } - private boolean connectToExistingProcess() { - return host != null && port > 0; + public String getSessionId() { + return this.sessionId; } - public RemoteInterpreterProcess getInterpreterProcess() { - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup == null) { - return null; + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() { + if (this.interpreterProcess != null) { + return this.interpreterProcess; } - - synchronized (intpGroup) { - if (intpGroup.getRemoteInterpreterProcess() == null) { - RemoteInterpreterProcess remoteProcess; - if (connectToExistingProcess()) { - remoteProcess = new RemoteInterpreterRunningProcess( - connectTimeout, - remoteInterpreterProcessListener, - applicationEventListener, - host, - port); - } else { - // create new remote process - remoteProcess = new RemoteInterpreterManagedProcess( - interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, - remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName); - } - - intpGroup.setRemoteInterpreterProcess(remoteProcess); + ManagedInterpreterGroup intpGroup = getInterpreterGroup(); + this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(); + synchronized (interpreterProcess) { + if (!interpreterProcess.isRunning()) { + interpreterProcess.start(userName, false); + interpreterProcess.getRemoteInterpreterEventPoller() + .setInterpreterProcess(interpreterProcess); + interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup); + interpreterProcess.getRemoteInterpreterEventPoller().start(); } - - return intpGroup.getRemoteInterpreterProcess(); } + return interpreterProcess; } - public synchronized void init() { - if (initialized == true) { - return; - } - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - - final InterpreterGroup interpreterGroup = getInterpreterGroup(); - - interpreterProcess.setMaxPoolSize( - Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize())); - String groupId = interpreterGroup.getId(); - - synchronized (interpreterProcess) { - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - logger.info("Create remote interpreter {}", getClassName()); - if (localRepoPath != null) { - property.put("zeppelin.interpreter.localRepo", localRepoPath); - } + public ManagedInterpreterGroup getInterpreterGroup() { + return (ManagedInterpreterGroup) super.getInterpreterGroup(); + } - property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit)); - client.createInterpreter(groupId, sessionKey, - getClassName(), (Map) property, userName); - // Push angular object loaded from JSON file to remote interpreter - if (!interpreterGroup.isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - interpreterGroup.setAngularRegistryPushed(true); + @Override + public void open() { + synchronized (this) { + if (!isOpened) { + // create all the interpreters of the same session first, then Open the internal interpreter + // of this RemoteInterpreter. + // The why we we create all the interpreter of the session is because some interpreter + // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter. + // also see method Interpreter.getInterpreterInTheSameSessionByClassName + for (Interpreter interpreter : getInterpreterGroup() + .getOrCreateSession(userName, sessionId)) { + ((RemoteInterpreter) interpreter).internal_create(); } - } catch (TException e) { - logger.error("Failed to create interpreter: {}", getClassName()); - throw new InterpreterException(e); - } finally { - // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken - interpreterProcess.releaseClient(client, broken); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Open RemoteInterpreter {}", getClassName()); + // open interpreter here instead of in the jobRun method in RemoteInterpreterServer + // client.open(sessionId, className); + // Push angular object loaded from JSON file to remote interpreter + synchronized (getInterpreterGroup()) { + if (!getInterpreterGroup().isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + getInterpreterGroup().setAngularRegistryPushed(true); + } + } + return null; + } + }); + isOpened = true; } } - initialized = true; } - - @Override - public void open() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - - synchronized (interpreterGroup) { - // initialize all interpreters in this interpreter group - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, - // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it - // doesn't call open method, it's not open. It causes problem while running intp.close() - // In case of Spark, this method initializes all of interpreters and init() method increases - // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all - // other interpreters doesn't do anything because those LazyInterpreters aren't open. - // But for now, we have to initialise all of interpreters for some reasons. - // See Interpreter.getInterpreterInTheSameSessionByClassName(String) - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (!initialized) { - // reference per session - interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate); - } - for (Interpreter intp : new ArrayList<>(interpreters)) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - try { - ((RemoteInterpreter) p).init(); - } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", - p.getClassName()); - interpreters.remove(p); - } + private void internal_create() { + synchronized (this) { + if (!isCreated) { + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Create RemoteInterpreter {}", getClassName()); + client.createInterpreter(getInterpreterGroup().getId(), sessionId, + className, (Map) property, userName); + return null; + } + }); + isCreated = true; } } } + @Override public void close() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - synchronized (interpreterGroup) { - // close all interpreters in this session - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, - // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it - // doesn't call open method, it's not open. It causes problem while running intp.close() - // In case of Spark, this method initializes all of interpreters and init() method increases - // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all - // other interpreters doesn't do anything because those LazyInterpreters aren't open. - // But for now, we have to initialise all of interpreters for some reasons. - // See Interpreter.getInterpreterInTheSameSessionByClassName(String) - if (initialized) { - // dereference per session - getInterpreterProcess().dereference(); - } - for (Interpreter intp : new ArrayList<>(interpreters)) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - try { - ((RemoteInterpreter) p).closeInterpreter(); - } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", - p.getClassName()); - interpreters.remove(p); + if (isOpened) { + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.close(sessionId, className); + return null; } - } - } - } - - public void closeInterpreter() { - if (this.initialized == false) { - return; - } - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - boolean broken = false; - try { - client = interpreterProcess.getClient(); - if (client != null) { - client.close(sessionKey, className); - } - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } catch (Exception e1) { - throw new InterpreterException(e1); - } finally { - if (client != null) { - interpreterProcess.releaseClient(client, broken); - } - this.initialized = false; + }); + isOpened = false; + } else { + LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className); } } @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - if (logger.isDebugEnabled()) { - logger.debug("st:\n{}", st); - } - - FormType form = getFormType(); - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); + public InterpreterResult interpret(final String st, final InterpreterContext context) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("st:\n{}", st); } + final FormType form = getFormType(); + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess .getInterpreterContextRunnerPool(); - List<InterpreterContextRunner> runners = context.getRunners(); if (runners != null && runners.size() != 0) { // assume all runners in this InterpreterContext have the same note id @@ -382,165 +201,153 @@ public class RemoteInterpreter extends Interpreter { interpreterContextRunnerPool.clear(noteId); interpreterContextRunnerPool.addAll(noteId, runners); } + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() { + @Override + public InterpreterResult call(Client client) throws Exception { + + RemoteInterpreterResult remoteResult = client.interpret( + sessionId, className, st, convert(context)); + Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( + remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { + }.getType()); + context.getConfig().clear(); + context.getConfig().putAll(remoteConfig); + GUI currentGUI = context.getGui(); + if (form == FormType.NATIVE) { + GUI remoteGui = GUI.fromJson(remoteResult.getGui()); + currentGUI.clear(); + currentGUI.setParams(remoteGui.getParams()); + currentGUI.setForms(remoteGui.getForms()); + } else if (form == FormType.SIMPLE) { + final Map<String, Input> currentForms = currentGUI.getForms(); + final Map<String, Object> currentParams = currentGUI.getParams(); + final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); + final Map<String, Input> remoteForms = remoteGUI.getForms(); + final Map<String, Object> remoteParams = remoteGUI.getParams(); + currentForms.putAll(remoteForms); + currentParams.putAll(remoteParams); + } + + InterpreterResult result = convert(remoteResult); + return result; + } + } + ); - boolean broken = false; - try { - - final GUI currentGUI = context.getGui(); - RemoteInterpreterResult remoteResult = client.interpret( - sessionKey, className, st, convert(context)); - - Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( - remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { - }.getType()); - context.getConfig().clear(); - context.getConfig().putAll(remoteConfig); - - if (form == FormType.NATIVE) { - GUI remoteGui = GUI.fromJson(remoteResult.getGui()); - currentGUI.clear(); - currentGUI.setParams(remoteGui.getParams()); - currentGUI.setForms(remoteGui.getForms()); - } else if (form == FormType.SIMPLE) { - final Map<String, Input> currentForms = currentGUI.getForms(); - final Map<String, Object> currentParams = currentGUI.getParams(); - final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); - final Map<String, Input> remoteForms = remoteGUI.getForms(); - final Map<String, Object> remoteParams = remoteGUI.getParams(); - currentForms.putAll(remoteForms); - currentParams.putAll(remoteParams); - } - - InterpreterResult result = convert(remoteResult); - return result; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } } @Override - public void cancel(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - client.cancel(sessionKey, className, convert(context)); - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); + public void cancel(final InterpreterContext context) { + if (!isOpened) { + LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className); + return; } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.cancel(sessionId, className, convert(context)); + return null; + } + }); } @Override public FormType getFormType() { - open(); - if (formType != null) { return formType; } - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - formType = FormType.valueOf(client.getFormType(sessionKey, className)); - return formType; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); + // it is possible to call getFormType before it is opened + synchronized (this) { + if (!isOpened) { + open(); + } } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + FormType type = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<FormType>() { + @Override + public FormType call(Client client) throws Exception { + formType = FormType.valueOf(client.getFormType(sessionId, className)); + return formType; + } + }); + return type; } @Override - public int getProgress(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (interpreterProcess == null || !interpreterProcess.isRunning()) { + public int getProgress(final InterpreterContext context) { + if (!isOpened) { + LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className); return 0; } - - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - return client.getProgress(sessionKey, className, convert(context)); - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Integer>() { + @Override + public Integer call(Client client) throws Exception { + return client.getProgress(sessionId, className, convert(context)); + } + }); } @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); + public List<InterpreterCompletion> completion(final String buf, final int cursor, + final InterpreterContext interpreterContext) { + if (!isOpened) { + LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className); + return new ArrayList<>(); } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { + @Override + public List<InterpreterCompletion> call(Client client) throws Exception { + return client.completion(sessionId, className, buf, cursor, + convert(interpreterContext)); + } + }); + } - boolean broken = false; - try { - List completion = client.completion(sessionKey, className, buf, cursor, - convert(interpreterContext)); - return completion; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); + public String getStatus(final String jobId) { + if (!isOpened) { + LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className); + return Job.Status.UNKNOWN.name(); } + RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess(); + return interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<String>() { + @Override + public String call(Client client) throws Exception { + return client.getStatus(sessionId, jobId); + } + }); } + //TODO(zjffdu) Share the Scheduler in the same session or in the same InterpreterGroup ? @Override public Scheduler getScheduler() { - int maxConcurrency = maxPoolSize; - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (interpreterProcess == null) { - return null; - } else { - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(), - sessionKey, interpreterProcess, maxConcurrency); - } - } - - private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { - return interpreterGroup.getId(); + int maxConcurrency = Integer.parseInt( + property.getProperty("zeppelin.interpreter.max.poolsize", + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); + + Scheduler s = new RemoteScheduler( + RemoteInterpreter.class.getName() + "-" + sessionId, + SchedulerFactory.singleton().getExecutor(), + sessionId, + this, + SchedulerFactory.singleton(), + maxConcurrency); + return SchedulerFactory.singleton().createOrGetScheduler(s); } private RemoteInterpreterContext convert(InterpreterContext ic) { return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), - ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(), - gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners())); + ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()), + gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners())); } private InterpreterResult convert(RemoteInterpreterResult result) { @@ -557,41 +364,25 @@ public class RemoteInterpreter extends Interpreter { /** * Push local angular object registry to * remote interpreter. This method should be - * call ONLY inside the init() method + * call ONLY once when the first Interpreter is created */ - void pushAngularObjectRegistryToRemote(Client client) throws TException { + private void pushAngularObjectRegistryToRemote(Client client) throws TException { final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() .getAngularObjectRegistry(); - if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry .getRegistry(); - - logger.info("Push local angular object registry from ZeppelinServer to" + + LOGGER.info("Push local angular object registry from ZeppelinServer to" + " remote interpreter group {}", this.getInterpreterGroup().getId()); - final java.lang.reflect.Type registryType = new TypeToken<Map<String, Map<String, AngularObject>>>() { }.getType(); - - Gson gson = new Gson(); client.angularRegistryPush(gson.toJson(registry, registryType)); } } - public Map<String, String> getEnv() { - return env; - } - - public void addEnv(Map<String, String> env) { - if (this.env == null) { - this.env = new HashMap<>(); - } - this.env.putAll(env); - } - - //Only for test - public String getInterpreterRunner() { - return interpreterRunner; + @Override + public String toString() { + return "RemoteInterpreter_" + className + "_" + sessionId; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java new file mode 100644 index 0000000..ca23bcf --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; +import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourceId; +import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourceSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Processes message from RemoteInterpreter process + */ +public class RemoteInterpreterEventPoller extends Thread { + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private final ScheduledExecutorService appendService = + Executors.newSingleThreadScheduledExecutor(); + private final RemoteInterpreterProcessListener listener; + private final ApplicationEventListener appListener; + + private volatile boolean shutdown; + + private RemoteInterpreterProcess interpreterProcess; + private ManagedInterpreterGroup interpreterGroup; + + Gson gson = new Gson(); + + public RemoteInterpreterEventPoller( + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener) { + this.listener = listener; + this.appListener = appListener; + shutdown = false; + } + + public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) { + this.interpreterProcess = interpreterProcess; + } + + public void setInterpreterGroup(ManagedInterpreterGroup interpreterGroup) { + this.interpreterGroup = interpreterGroup; + } + + @Override + public void run() { + AppendOutputRunner runner = new AppendOutputRunner(listener); + ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay( + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + + while (!shutdown) { + // wait and retry + if (!interpreterProcess.isRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // nothing to do + } + continue; + } + + RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() { + @Override + public RemoteInterpreterEvent call(Client client) throws Exception { + return client.getEvent(); + } + } + ); + + AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); + + try { + if (event.getType() != RemoteInterpreterEventType.NO_OP) { + logger.debug("Receive message from RemoteInterpreter Process: " + event.toString()); + } + if (event.getType() == RemoteInterpreterEventType.NO_OP) { + continue; + } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) { + AngularObject angularObject = AngularObject.fromJson(event.getData()); + angularObjectRegistry.add(angularObject.getName(), + angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId()); + } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) { + AngularObject angularObject = AngularObject.fromJson(event.getData()); + AngularObject localAngularObject = angularObjectRegistry.get( + angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); + if (localAngularObject instanceof RemoteAngularObject) { + // to avoid ping-pong loop + ((RemoteAngularObject) localAngularObject).set( + angularObject.get(), true, false); + } else { + localAngularObject.set(angularObject.get()); + } + } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) { + AngularObject angularObject = AngularObject.fromJson(event.getData()); + angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(), + angularObject.getParagraphId()); + } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) { + InterpreterContextRunner runnerFromRemote = gson.fromJson( + event.getData(), RemoteInterpreterContextRunner.class); + + listener.onRemoteRunParagraph( + runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId()); + + } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) { + ResourceSet resourceSet = getAllResourcePoolExcept(); + sendResourcePoolResponseGetAll(resourceSet); + } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) { + String resourceIdString = event.getData(); + ResourceId resourceId = ResourceId.fromJson(resourceIdString); + logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName()); + Object o = getResource(resourceId); + sendResourceResponseGet(resourceId, o); + } else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) { + String message = event.getData(); + InvokeResourceMethodEventMessage invokeMethodMessage = + InvokeResourceMethodEventMessage.fromJson(message); + Object ret = invokeResourceMethod(invokeMethodMessage); + sendInvokeMethodResult(invokeMethodMessage, ret); + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) { + // on output append + Map<String, String> outputAppend = gson.fromJson( + event.getData(), new TypeToken<Map<String, Object>>() {}.getType()); + String noteId = (String) outputAppend.get("noteId"); + String paragraphId = (String) outputAppend.get("paragraphId"); + int index = Integer.parseInt(outputAppend.get("index")); + String outputToAppend = (String) outputAppend.get("data"); + + String appId = (String) outputAppend.get("appId"); + + if (appId == null) { + runner.appendBuffer(noteId, paragraphId, index, outputToAppend); + } else { + appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend); + } + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) { + Map<String, Object> outputUpdate = gson.fromJson( + event.getData(), new TypeToken<Map<String, Object>>() {}.getType()); + String noteId = (String) outputUpdate.get("noteId"); + String paragraphId = (String) outputUpdate.get("paragraphId"); + + // clear the output + List<Map<String, String>> messages = + (List<Map<String, String>>) outputUpdate.get("messages"); + + if (messages != null) { + listener.onOutputClear(noteId, paragraphId); + for (int i = 0; i < messages.size(); i++) { + Map<String, String> m = messages.get(i); + InterpreterResult.Type type = + InterpreterResult.Type.valueOf((String) m.get("type")); + String outputToUpdate = (String) m.get("data"); + + listener.onOutputUpdated(noteId, paragraphId, i, type, outputToUpdate); + } + } + } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) { + // on output update + Map<String, String> outputAppend = gson.fromJson( + event.getData(), new TypeToken<Map<String, Object>>() {}.getType()); + String noteId = (String) outputAppend.get("noteId"); + String paragraphId = (String) outputAppend.get("paragraphId"); + int index = Integer.parseInt(outputAppend.get("index")); + InterpreterResult.Type type = + InterpreterResult.Type.valueOf((String) outputAppend.get("type")); + String outputToUpdate = (String) outputAppend.get("data"); + String appId = (String) outputAppend.get("appId"); + + if (appId == null) { + listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate); + } else { + appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate); + } + } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) { + // on output update + Map<String, String> appStatusUpdate = gson.fromJson( + event.getData(), new TypeToken<Map<String, String>>() {}.getType()); + + String noteId = appStatusUpdate.get("noteId"); + String paragraphId = appStatusUpdate.get("paragraphId"); + String appId = appStatusUpdate.get("appId"); + String status = appStatusUpdate.get("status"); + + appListener.onStatusChange(noteId, paragraphId, appId, status); + } else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) { + RemoteZeppelinServerResource reqResourceBody = RemoteZeppelinServerResource.fromJson( + event.getData()); + progressRemoteZeppelinControlEvent( + reqResourceBody.getResourceType(), listener, reqResourceBody); + + } else if (event.getType() == RemoteInterpreterEventType.META_INFOS) { + Map<String, String> metaInfos = gson.fromJson(event.getData(), + new TypeToken<Map<String, String>>() { + }.getType()); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); + listener.onMetaInfosReceived(settingId, metaInfos); + } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) { + Map<String, String> paraInfos = gson.fromJson(event.getData(), + new TypeToken<Map<String, String>>() { + }.getType()); + String noteId = paraInfos.get("noteId"); + String paraId = paraInfos.get("paraId"); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); + if (noteId != null && paraId != null && settingId != null) { + listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos); + } + } + logger.debug("Event from remote process {}", event.getType()); + } catch (Exception e) { + logger.error("Can't handle event " + event, e); + } + } + try { + clearUnreadEvents(interpreterProcess.getClient()); + } catch (Exception e1) { + logger.error("Can't get RemoteInterpreterEvent", e1); + } + if (appendFuture != null) { + appendFuture.cancel(true); + } + } + + private void clearUnreadEvents(Client client) throws TException { + while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {} + } + + private void progressRemoteZeppelinControlEvent( + RemoteZeppelinServerResource.Type resourceType, + RemoteInterpreterProcessListener remoteWorksEventListener, + RemoteZeppelinServerResource reqResourceBody) throws Exception { + boolean broken = false; + final Gson gson = new Gson(); + final String eventOwnerKey = reqResourceBody.getOwnerKey(); + try { + if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) { + final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>(); + + ZeppelinServerResourceParagraphRunner reqRunnerContext = + new ZeppelinServerResourceParagraphRunner(); + + Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData(); + String noteId = (String) reqResourceMap.get("noteId"); + String paragraphId = (String) reqResourceMap.get("paragraphId"); + + reqRunnerContext.setNoteId(noteId); + reqRunnerContext.setParagraphId(paragraphId); + + RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent = + new RemoteInterpreterProcessListener.RemoteWorksEventListener() { + + @Override + public void onFinished(Object resultObject) { + if (resultObject != null && resultObject instanceof List) { + List<InterpreterContextRunner> runnerList = + (List<InterpreterContextRunner>) resultObject; + for (InterpreterContextRunner r : runnerList) { + remoteRunners.add( + new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId()) + ); + } + + final RemoteZeppelinServerResource resResource = + new RemoteZeppelinServerResource(); + resResource.setOwnerKey(eventOwnerKey); + resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS); + resResource.setData(remoteRunners); + + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.onReceivedZeppelinResource(resResource.toJson()); + return null; + } + } + ); + } + } + + @Override + public void onError() { + logger.info("onGetParagraphRunners onError"); + } + }; + + remoteWorksEventListener.onGetParagraphRunners( + reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent); + } + } catch (Exception e) { + logger.error("Can't get RemoteInterpreterEvent", e); + waitQuietly(); + + } + } + + private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) { + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + List<String> resourceList = new LinkedList<>(); + for (Resource r : resourceSet) { + resourceList.add(r.toJson()); + } + client.resourcePoolResponseGetAll(resourceList); + return null; + } + } + ); + } + + private ResourceSet getAllResourcePoolExcept() { + ResourceSet resourceSet = new ResourceSet(); + for (ManagedInterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting() + .getInterpreterSettingManager().getAllInterpreterGroup()) { + if (intpGroup.getId().equals(interpreterGroup.getId())) { + continue; + } + + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + } else if (interpreterProcess.isRunning()) { + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(Client client) throws Exception { + return client.resourcePoolGetAll(); + } + } + ); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } + } + } + return resourceSet; + } + + private void sendResourceResponseGet(final ResourceId resourceId, final Object o) { + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + String rid = resourceId.toJson(); + ByteBuffer obj; + if (o == null) { + obj = ByteBuffer.allocate(0); + } else { + obj = Resource.serializeObject(o); + } + client.resourceResponseGet(rid, obj); + return null; + } + } + ); + } + + private Object getResource(final ResourceId resourceId) { + ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting() + .getInterpreterSettingManager() + .getInterpreterGroupById(resourceId.getResourcePoolId()); + if (intpGroup == null) { + return null; + } + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(Client client) throws Exception { + return client.resourceGet( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName()); + } + } + ); + + try { + Object o = Resource.deserializeObject(buffer); + return o; + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + return null; + } + + public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message, + final Object o) { + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + String invokeMessage = message.toJson(); + ByteBuffer obj; + if (o == null) { + obj = ByteBuffer.allocate(0); + } else { + obj = Resource.serializeObject(o); + } + client.resourceResponseInvokeMethod(invokeMessage, obj); + return null; + } + } + ); + } + + private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) { + final ResourceId resourceId = message.resourceId; + ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting() + .getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId()); + if (intpGroup == null) { + return null; + } + + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + Resource res = localPool.get(resourceId.getName()); + if (res != null) { + try { + return res.invokeMethod( + message.methodName, + message.getParamTypes(), + message.params, + message.returnResourceName); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return null; + } + } else { + // object is null. can't invoke any method + logger.error("Can't invoke method {} on null object", message.methodName); + return null; + } + } else { + logger.error("no resource pool"); + return null; + } + } else if (interpreterProcess.isRunning()) { + ByteBuffer res = interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(Client client) throws Exception { + return client.resourceInvokeMethod( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName(), + message.toJson()); + } + } + ); + + try { + return Resource.deserializeObject(res); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + return null; + } + return null; + } + + private void waitQuietly() { + try { + synchronized (this) { + wait(1000); + } + } catch (InterruptedException ignored) { + logger.info("Error in RemoteInterpreterEventPoller while waitQuietly : ", ignored); + } + } + + public void shutdown() { + shutdown = true; + synchronized (this) { + notify(); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 1fb9b90..19356fb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -21,6 +21,7 @@ import org.apache.commons.exec.*; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,6 +98,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess // start server process try { port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + logger.info("Choose port {} for RemoteInterpreterProcess", port); } catch (IOException e1) { throw new InterpreterException(e1); } @@ -172,6 +174,17 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public void stop() { if (isRunning()) { logger.info("kill interpreter process"); + try { + callRemoteFunction(new RemoteFunction<Void>() { + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.shutdown(); + return null; + } + }); + } catch (Exception e) { + logger.warn("ignore the exception when shutting down"); + } watchdog.destroyProcess(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java new file mode 100644 index 0000000..d34c538 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +import com.google.gson.Gson; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.thrift.TException; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract class for interpreter process + */ +public abstract class RemoteInterpreterProcess { + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); + + private GenericObjectPool<Client> clientPool; + private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; + private final InterpreterContextRunnerPool interpreterContextRunnerPool; + private int connectTimeout; + + public RemoteInterpreterProcess( + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener) { + this(new RemoteInterpreterEventPoller(listener, appListener), + connectTimeout); + this.remoteInterpreterEventPoller.setInterpreterProcess(this); + } + + RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout) { + this.interpreterContextRunnerPool = new InterpreterContextRunnerPool(); + this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; + this.connectTimeout = connectTimeout; + } + + public RemoteInterpreterEventPoller getRemoteInterpreterEventPoller() { + return remoteInterpreterEventPoller; + } + + public abstract String getHost(); + public abstract int getPort(); + public abstract void start(String userName, Boolean isUserImpersonate); + public abstract void stop(); + public abstract boolean isRunning(); + + public int getConnectTimeout() { + return connectTimeout; + } + + public synchronized Client getClient() throws Exception { + if (clientPool == null || clientPool.isClosed()) { + clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); + } + return clientPool.borrowObject(); + } + + private void releaseClient(Client client) { + releaseClient(client, false); + } + + private void releaseClient(Client client, boolean broken) { + if (broken) { + releaseBrokenClient(client); + } else { + try { + clientPool.returnObject(client); + } catch (Exception e) { + logger.warn("exception occurred during releasing thrift client", e); + } + } + } + + private void releaseBrokenClient(Client client) { + try { + clientPool.invalidateObject(client); + } catch (Exception e) { + logger.warn("exception occurred during releasing thrift client", e); + } + } + + /** + * Called when angular object is updated in client side to propagate + * change to the remote process + * @param name + * @param o + */ + public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) { + Client client = null; + try { + client = getClient(); + } catch (NullPointerException e) { + // remote process not started + logger.info("NullPointerException in RemoteInterpreterProcess while " + + "updateRemoteAngularObject getClient, remote process not started", e); + return; + } catch (Exception e) { + logger.error("Can't update angular object", e); + } + + boolean broken = false; + try { + Gson gson = new Gson(); + client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o)); + } catch (TException e) { + broken = true; + logger.error("Can't update angular object", e); + } catch (NullPointerException e) { + logger.error("Remote interpreter process not started", e); + return; + } finally { + if (client != null) { + releaseClient(client, broken); + } + } + } + + public InterpreterContextRunnerPool getInterpreterContextRunnerPool() { + return interpreterContextRunnerPool; + } + + public <T> T callRemoteFunction(RemoteFunction<T> func) { + Client client = null; + boolean broken = false; + try { + client = getClient(); + if (client != null) { + return func.call(client); + } + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } catch (Exception e1) { + throw new InterpreterException(e1); + } finally { + if (client != null) { + releaseClient(client, broken); + } + } + return null; + } + + /** + * + * @param <T> + */ + public interface RemoteFunction<T> { + T call(Client client) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java new file mode 100644 index 0000000..8b23bf2 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.interpreter.InterpreterResult; + +import java.util.Map; + +/** + * Event from remoteInterpreterProcess + */ +public interface RemoteInterpreterProcessListener { + public void onOutputAppend(String noteId, String paragraphId, int index, String output); + public void onOutputUpdated( + String noteId, String paragraphId, int index, InterpreterResult.Type type, String output); + public void onOutputClear(String noteId, String paragraphId); + public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos); + public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception; + public void onGetParagraphRunners( + String noteId, String paragraphId, RemoteWorksEventListener callback); + + /** + * Remote works for Interpreter callback listener + */ + public interface RemoteWorksEventListener { + public void onFinished(Object resultObject); + public void onError(); + } + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java index 1505db9..bc71d89 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.helium.HeliumPackage; -import org.apache.zeppelin.interpreter.InterpreterGroup; /** * Current state of application http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 198e278..5a42f37 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -20,7 +20,6 @@ package org.apache.zeppelin.notebook; import static java.lang.String.format; import java.io.IOException; -import java.io.Serializable; import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.common.JsonSerializable; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -41,7 +38,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; -import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.search.SearchService; @@ -126,11 +122,6 @@ public class Note implements ParagraphJobListener, JsonSerializable { id = IdHashes.generateId(); } - private String getDefaultInterpreterName() { - InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId()); - return null != setting ? setting.getName() : StringUtils.EMPTY; - } - public boolean isPersonalizedMode() { Object v = getConfig().get("personalizedMode"); return null != v && "true".equals(v); @@ -385,7 +376,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { */ public Paragraph removeParagraph(String user, String paragraphId) { removeAllAngularObjectInParagraph(user, paragraphId); - ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId); + interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId); synchronized (paragraphs) { Iterator<Paragraph> i = paragraphs.iterator(); while (i.hasNext()) { @@ -690,7 +681,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id)); } @@ -705,7 +696,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index a0c1dff..4652fcd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.notebook; import java.io.IOException; -import java.io.StringReader; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -35,10 +34,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Sets; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.stream.JsonReader; import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; @@ -56,11 +53,9 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; -import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; @@ -140,7 +135,7 @@ public class Notebook implements NoteEventListener { Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null"); Note note; if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) { - note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject); + note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject); } else { note = createNote(null, subject); } @@ -270,8 +265,8 @@ public class Notebook implements NoteEventListener { } } - interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds); - // comment out while note.getNoteReplLoader().setInterpreters(...) do the same + interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds); + // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); } } @@ -279,7 +274,7 @@ public class Notebook implements NoteEventListener { List<String> getBindedInterpreterSettingsIds(String id) { Note note = getNote(id); if (note != null) { - return interpreterSettingManager.getInterpreters(note.getId()); + return interpreterSettingManager.getInterpreterBinding(note.getId()); } else { return new LinkedList<>(); } @@ -313,9 +308,10 @@ public class Notebook implements NoteEventListener { } public void moveNoteToTrash(String noteId) { - for (InterpreterSetting interpreterSetting : interpreterSettingManager - .getInterpreterSettings(noteId)) { - interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId); + try { + interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>()); + } catch (IOException e) { + e.printStackTrace(); } } @@ -339,7 +335,7 @@ public class Notebook implements NoteEventListener { // remove from all interpreter instance's angular object registry for (InterpreterSetting settings : interpreterSettingManager.get()) { AngularObjectRegistry registry = - settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); + settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { // remove paragraph scope object for (Paragraph p : note.getParagraphs()) { @@ -374,7 +370,7 @@ public class Notebook implements NoteEventListener { } } - ResourcePoolUtils.removeResourcesBelongsToNote(id); + interpreterSettingManager.removeResourcesBelongsToNote(id); fireNoteRemoveEvent(note); @@ -521,7 +517,8 @@ public class Notebook implements NoteEventListener { SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); List<InterpreterSetting> settings = interpreterSettingManager.get(); for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId()); + InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(), + note.getId()); if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); String noteId = snapshot.getAngularObject().getNoteId(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 37138e6..161dc30 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -28,8 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.common.JsonSerializable; @@ -93,10 +91,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph // see ZEPPELIN-212 - Object results; + volatile Object results; // For backward compatibility of note.json format after ZEPPELIN-212 - Object result; + volatile Object result; private Map<String, ParagraphRuntimeInfo> runtimeInfos; /** @@ -157,7 +155,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } @Override - public void setResult(Object results) { + public synchronized void setResult(Object results) { this.results = results; } @@ -354,7 +352,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } @Override - public Object getReturn() { + public synchronized Object getReturn() { return results; } @@ -401,6 +399,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { logger.error("Can not find interpreter name " + repl); throw new RuntimeException("Can not find interpreter for " + getRequiredReplName()); } + //TODO(zjffdu) check interpreter setting status in interpreter setting itself InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId()); while (intp.getStatus().equals( org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) { @@ -560,8 +559,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0); - registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry(); - resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool(); + registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getAngularObjectRegistry(); + resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<>(); @@ -591,8 +592,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0); - registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry(); - resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool(); + registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getAngularObjectRegistry(); + resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<>();