http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java deleted file mode 100644 index 12e0caa..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ /dev/null @@ -1,597 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import java.util.*; - -import org.apache.commons.lang3.StringUtils; -import org.apache.thrift.TException; -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -/** - * Proxy for Interpreter instance that runs on separate process - */ -public class RemoteInterpreter extends Interpreter { - private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); - - private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; - private final ApplicationEventListener applicationEventListener; - private Gson gson = new Gson(); - private String interpreterRunner; - private String interpreterPath; - private String localRepoPath; - private String className; - private String sessionKey; - private FormType formType; - private boolean initialized; - private Map<String, String> env; - private int connectTimeout; - private int maxPoolSize; - private String host; - private int port; - private String userName; - private Boolean isUserImpersonate; - private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; - private String interpreterGroupName; - - /** - * Remote interpreter and manage interpreter process - */ - public RemoteInterpreter(Properties property, String sessionKey, String className, - String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, - int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit, String interpreterGroupName) { - super(property); - this.sessionKey = sessionKey; - this.className = className; - initialized = false; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.localRepoPath = localRepoPath; - env = getEnvFromInterpreterProperty(property); - this.connectTimeout = connectTimeout; - this.maxPoolSize = maxPoolSize; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - this.outputLimit = outputLimit; - this.interpreterGroupName = interpreterGroupName; - } - - - /** - * Connect to existing process - */ - public RemoteInterpreter(Properties property, String sessionKey, String className, String host, - int port, String localRepoPath, int connectTimeout, int maxPoolSize, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, - int outputLimit) { - super(property); - this.sessionKey = sessionKey; - this.className = className; - initialized = false; - this.host = host; - this.port = port; - this.localRepoPath = localRepoPath; - this.connectTimeout = connectTimeout; - this.maxPoolSize = maxPoolSize; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - this.outputLimit = outputLimit; - } - - - // VisibleForTesting - public RemoteInterpreter(Properties property, String sessionKey, String className, - String interpreterRunner, String interpreterPath, String localRepoPath, - Map<String, String> env, int connectTimeout, - RemoteInterpreterProcessListener remoteInterpreterProcessListener, - ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { - super(property); - this.className = className; - this.sessionKey = sessionKey; - this.interpreterRunner = interpreterRunner; - this.interpreterPath = interpreterPath; - this.localRepoPath = localRepoPath; - env.putAll(getEnvFromInterpreterProperty(property)); - this.env = env; - this.connectTimeout = connectTimeout; - this.maxPoolSize = 10; - this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; - this.applicationEventListener = appListener; - this.userName = userName; - this.isUserImpersonate = isUserImpersonate; - } - - private Map<String, String> getEnvFromInterpreterProperty(Properties property) { - Map<String, String> env = new HashMap<String, String>(); - StringBuilder sparkConfBuilder = new StringBuilder(); - for (String key : property.stringPropertyNames()) { - if (RemoteInterpreterUtils.isEnvString(key)) { - env.put(key, property.getProperty(key)); - } - if (key.equals("master")) { - sparkConfBuilder.append(" --master " + property.getProperty("master")); - } - if (isSparkConf(key, property.getProperty(key))) { - sparkConfBuilder.append(" --conf " + key + "=" + - toShellFormat(property.getProperty(key))); - } - } - env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); - return env; - } - - private String toShellFormat(String value) { - if (value.contains("\'") && value.contains("\"")) { - throw new RuntimeException("Spark property value could not contain both \" and '"); - } else if (value.contains("\'")) { - return "\"" + value + "\""; - } else { - return "\'" + value + "\'"; - } - } - - static boolean isSparkConf(String key, String value) { - return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); - } - - @Override - public String getClassName() { - return className; - } - - private boolean connectToExistingProcess() { - return host != null && port > 0; - } - - public RemoteInterpreterProcess getInterpreterProcess() { - InterpreterGroup intpGroup = getInterpreterGroup(); - if (intpGroup == null) { - return null; - } - - synchronized (intpGroup) { - if (intpGroup.getRemoteInterpreterProcess() == null) { - RemoteInterpreterProcess remoteProcess; - if (connectToExistingProcess()) { - remoteProcess = new RemoteInterpreterRunningProcess( - connectTimeout, - remoteInterpreterProcessListener, - applicationEventListener, - host, - port); - } else { - // create new remote process - remoteProcess = new RemoteInterpreterManagedProcess( - interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, - remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName); - } - - intpGroup.setRemoteInterpreterProcess(remoteProcess); - } - - return intpGroup.getRemoteInterpreterProcess(); - } - } - - public synchronized void init() { - if (initialized == true) { - return; - } - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - - final InterpreterGroup interpreterGroup = getInterpreterGroup(); - - interpreterProcess.setMaxPoolSize( - Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize())); - String groupId = interpreterGroup.getId(); - - synchronized (interpreterProcess) { - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - logger.info("Create remote interpreter {}", getClassName()); - if (localRepoPath != null) { - property.put("zeppelin.interpreter.localRepo", localRepoPath); - } - - property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit)); - client.createInterpreter(groupId, sessionKey, - getClassName(), (Map) property, userName); - // Push angular object loaded from JSON file to remote interpreter - if (!interpreterGroup.isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - interpreterGroup.setAngularRegistryPushed(true); - } - - } catch (TException e) { - logger.error("Failed to create interpreter: {}", getClassName()); - throw new InterpreterException(e); - } finally { - // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken - interpreterProcess.releaseClient(client, broken); - } - } - initialized = true; - } - - - @Override - public void open() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - - synchronized (interpreterGroup) { - // initialize all interpreters in this interpreter group - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, - // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it - // doesn't call open method, it's not open. It causes problem while running intp.close() - // In case of Spark, this method initializes all of interpreters and init() method increases - // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all - // other interpreters doesn't do anything because those LazyInterpreters aren't open. - // But for now, we have to initialise all of interpreters for some reasons. - // See Interpreter.getInterpreterInTheSameSessionByClassName(String) - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (!initialized) { - // reference per session - interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate); - } - for (Interpreter intp : new ArrayList<>(interpreters)) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - try { - ((RemoteInterpreter) p).init(); - } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", - p.getClassName()); - interpreters.remove(p); - } - } - } - } - - @Override - public void close() { - InterpreterGroup interpreterGroup = getInterpreterGroup(); - synchronized (interpreterGroup) { - // close all interpreters in this session - List<Interpreter> interpreters = interpreterGroup.get(sessionKey); - // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, - // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it - // doesn't call open method, it's not open. It causes problem while running intp.close() - // In case of Spark, this method initializes all of interpreters and init() method increases - // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all - // other interpreters doesn't do anything because those LazyInterpreters aren't open. - // But for now, we have to initialise all of interpreters for some reasons. - // See Interpreter.getInterpreterInTheSameSessionByClassName(String) - if (initialized) { - // dereference per session - getInterpreterProcess().dereference(); - } - for (Interpreter intp : new ArrayList<>(interpreters)) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - p = ((WrappedInterpreter) p).getInnerInterpreter(); - } - try { - ((RemoteInterpreter) p).closeInterpreter(); - } catch (InterpreterException e) { - logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", - p.getClassName()); - interpreters.remove(p); - } - } - } - } - - public void closeInterpreter() { - if (this.initialized == false) { - return; - } - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - boolean broken = false; - try { - client = interpreterProcess.getClient(); - if (client != null) { - client.close(sessionKey, className); - } - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } catch (Exception e1) { - throw new InterpreterException(e1); - } finally { - if (client != null) { - interpreterProcess.releaseClient(client, broken); - } - this.initialized = false; - } - } - - @Override - public InterpreterResult interpret(String st, InterpreterContext context) { - if (logger.isDebugEnabled()) { - logger.debug("st:\n{}", st); - } - - FormType form = getFormType(); - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess - .getInterpreterContextRunnerPool(); - - List<InterpreterContextRunner> runners = context.getRunners(); - if (runners != null && runners.size() != 0) { - // assume all runners in this InterpreterContext have the same note id - String noteId = runners.get(0).getNoteId(); - - interpreterContextRunnerPool.clear(noteId); - interpreterContextRunnerPool.addAll(noteId, runners); - } - - boolean broken = false; - try { - - final GUI currentGUI = context.getGui(); - RemoteInterpreterResult remoteResult = client.interpret( - sessionKey, className, st, convert(context)); - - Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( - remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { - }.getType()); - context.getConfig().clear(); - context.getConfig().putAll(remoteConfig); - - if (form == FormType.NATIVE) { - GUI remoteGui = GUI.fromJson(remoteResult.getGui()); - currentGUI.clear(); - currentGUI.setParams(remoteGui.getParams()); - currentGUI.setForms(remoteGui.getForms()); - } else if (form == FormType.SIMPLE) { - final Map<String, Input> currentForms = currentGUI.getForms(); - final Map<String, Object> currentParams = currentGUI.getParams(); - final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); - final Map<String, Input> remoteForms = remoteGUI.getForms(); - final Map<String, Object> remoteParams = remoteGUI.getParams(); - currentForms.putAll(remoteForms); - currentParams.putAll(remoteParams); - } - - InterpreterResult result = convert(remoteResult); - return result; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public void cancel(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - client.cancel(sessionKey, className, convert(context)); - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public FormType getFormType() { - open(); - - if (formType != null) { - return formType; - } - - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - formType = FormType.valueOf(client.getFormType(sessionKey, className)); - return formType; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public int getProgress(InterpreterContext context) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (interpreterProcess == null || !interpreterProcess.isRunning()) { - return 0; - } - - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - return client.getProgress(sessionKey, className, convert(context)); - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - Client client = null; - try { - client = interpreterProcess.getClient(); - } catch (Exception e1) { - throw new InterpreterException(e1); - } - - boolean broken = false; - try { - List completion = client.completion(sessionKey, className, buf, cursor, - convert(interpreterContext)); - return completion; - } catch (TException e) { - broken = true; - throw new InterpreterException(e); - } finally { - interpreterProcess.releaseClient(client, broken); - } - } - - @Override - public Scheduler getScheduler() { - int maxConcurrency = maxPoolSize; - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - if (interpreterProcess == null) { - return null; - } else { - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(), - sessionKey, interpreterProcess, maxConcurrency); - } - } - - private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { - return interpreterGroup.getId(); - } - - private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), - ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(), - gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners())); - } - - private InterpreterResult convert(RemoteInterpreterResult result) { - InterpreterResult r = new InterpreterResult( - InterpreterResult.Code.valueOf(result.getCode())); - - for (RemoteInterpreterResultMessage m : result.getMsg()) { - r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); - } - - return r; - } - - /** - * Push local angular object registry to - * remote interpreter. This method should be - * call ONLY inside the init() method - */ - void pushAngularObjectRegistryToRemote(Client client) throws TException { - final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() - .getAngularObjectRegistry(); - - if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { - final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry - .getRegistry(); - - logger.info("Push local angular object registry from ZeppelinServer to" + - " remote interpreter group {}", this.getInterpreterGroup().getId()); - - final java.lang.reflect.Type registryType = new TypeToken<Map<String, - Map<String, AngularObject>>>() { - }.getType(); - - Gson gson = new Gson(); - client.angularRegistryPush(gson.toJson(registry, registryType)); - } - } - - public Map<String, String> getEnv() { - return env; - } - - public void addEnv(Map<String, String> env) { - if (this.env == null) { - this.env = new HashMap<>(); - } - this.env.putAll(env); - } - - //Only for test - public String getInterpreterRunner() { - return interpreterRunner; - } -}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java deleted file mode 100644 index 1fb9b90..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import org.apache.commons.exec.*; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; - -/** - * This class manages start / stop of remote interpreter process - */ -public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess - implements ExecuteResultHandler { - private static final Logger logger = LoggerFactory.getLogger( - RemoteInterpreterManagedProcess.class); - private final String interpreterRunner; - - private DefaultExecutor executor; - private ExecuteWatchdog watchdog; - boolean running = false; - private int port = -1; - private final String interpreterDir; - private final String localRepoDir; - private final String interpreterGroupName; - - private Map<String, String> env; - - public RemoteInterpreterManagedProcess( - String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, - String interpreterGroupName) { - super(new RemoteInterpreterEventPoller(listener, appListener), - connectTimeout); - this.interpreterRunner = intpRunner; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - - RemoteInterpreterManagedProcess(String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - RemoteInterpreterEventPoller remoteInterpreterEventPoller, - int connectTimeout, - String interpreterGroupName) { - super(remoteInterpreterEventPoller, - connectTimeout); - this.interpreterRunner = intpRunner; - this.env = env; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.interpreterGroupName = interpreterGroupName; - } - - @Override - public String getHost() { - return "localhost"; - } - - @Override - public int getPort() { - return port; - } - - @Override - public void start(String userName, Boolean isUserImpersonate) { - // start server process - try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - - CommandLine cmdLine = CommandLine.parse(interpreterRunner); - cmdLine.addArgument("-d", false); - cmdLine.addArgument(interpreterDir, false); - cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); - if (isUserImpersonate && !userName.equals("anonymous")) { - cmdLine.addArgument("-u", false); - cmdLine.addArgument(userName, false); - } - cmdLine.addArgument("-l", false); - cmdLine.addArgument(localRepoDir, false); - cmdLine.addArgument("-g", false); - cmdLine.addArgument(interpreterGroupName, false); - - executor = new DefaultExecutor(); - - ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); - ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); - processOutput.setOutputStream(cmdOut); - - executor.setStreamHandler(new PumpStreamHandler(processOutput)); - watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); - executor.setWatchdog(watchdog); - - try { - Map procEnv = EnvironmentUtils.getProcEnvironment(); - procEnv.putAll(env); - - logger.info("Run interpreter process {}", cmdLine); - executor.execute(cmdLine, procEnv, this); - running = true; - } catch (IOException e) { - running = false; - throw new InterpreterException(e); - } - - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < getConnectTimeout()) { - if (!running) { - try { - cmdOut.flush(); - } catch (IOException e) { - // nothing to do - } - throw new InterpreterException(new String(cmdOut.toByteArray())); - } - - try { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + - "Thread.sleep", e); - } - } - } catch (Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Remote interpreter not yet accessible at localhost:" + port); - } - } - } - processOutput.setOutputStream(null); - } - - public void stop() { - if (isRunning()) { - logger.info("kill interpreter process"); - watchdog.destroyProcess(); - } - - executor = null; - watchdog = null; - running = false; - logger.info("Remote process terminated"); - } - - @Override - public void onProcessComplete(int exitValue) { - logger.info("Interpreter process exited {}", exitValue); - running = false; - - } - - @Override - public void onProcessFailed(ExecuteException e) { - logger.info("Interpreter process failed {}", e); - running = false; - } - - public boolean isRunning() { - return running; - } - - private static class ProcessLogOutputStream extends LogOutputStream { - - private Logger logger; - OutputStream out; - - public ProcessLogOutputStream(Logger logger) { - this.logger = logger; - } - - @Override - protected void processLine(String s, int i) { - this.logger.debug(s); - } - - @Override - public void write(byte [] b) throws IOException { - super.write(b); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b); - } - } - } - } - - @Override - public void write(byte [] b, int offset, int len) throws IOException { - super.write(b, offset, len); - - if (out != null) { - synchronized (this) { - if (out != null) { - out.write(b, offset, len); - } - } - } - } - - public void setOutputStream(OutputStream out) { - synchronized (this) { - this.out = out; - } - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java deleted file mode 100644 index bb176be..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.remote; - -import org.apache.zeppelin.helium.ApplicationEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class connects to existing process - */ -public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); - private final String host; - private final int port; - - public RemoteInterpreterRunningProcess( - int connectTimeout, - RemoteInterpreterProcessListener listener, - ApplicationEventListener appListener, - String host, - int port - ) { - super(connectTimeout, listener, appListener); - this.host = host; - this.port = port; - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getPort() { - return port; - } - - @Override - public void start(String userName, Boolean isUserImpersonate) { - // assume process is externally managed. nothing to do - } - - @Override - public void stop() { - // assume process is externally managed. nothing to do - } - - @Override - public boolean isRunning() { - return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java index 1505db9..bc71d89 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.helium.HeliumPackage; -import org.apache.zeppelin.interpreter.InterpreterGroup; /** * Current state of application http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 198e278..4a93d08 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -41,7 +41,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; -import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.search.SearchService; @@ -126,11 +125,6 @@ public class Note implements ParagraphJobListener, JsonSerializable { id = IdHashes.generateId(); } - private String getDefaultInterpreterName() { - InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId()); - return null != setting ? setting.getName() : StringUtils.EMPTY; - } - public boolean isPersonalizedMode() { Object v = getConfig().get("personalizedMode"); return null != v && "true".equals(v); @@ -385,7 +379,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { */ public Paragraph removeParagraph(String user, String paragraphId) { removeAllAngularObjectInParagraph(user, paragraphId); - ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId); + interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId); synchronized (paragraphs) { Iterator<Paragraph> i = paragraphs.iterator(); while (i.hasNext()) { @@ -690,7 +684,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id)); } @@ -705,7 +699,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index a0c1dff..fd3111b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -60,7 +60,6 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; -import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; @@ -140,7 +139,7 @@ public class Notebook implements NoteEventListener { Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null"); Note note; if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) { - note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject); + note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject); } else { note = createNote(null, subject); } @@ -270,8 +269,8 @@ public class Notebook implements NoteEventListener { } } - interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds); - // comment out while note.getNoteReplLoader().setInterpreters(...) do the same + interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds); + // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); } } @@ -279,7 +278,7 @@ public class Notebook implements NoteEventListener { List<String> getBindedInterpreterSettingsIds(String id) { Note note = getNote(id); if (note != null) { - return interpreterSettingManager.getInterpreters(note.getId()); + return interpreterSettingManager.getInterpreterBinding(note.getId()); } else { return new LinkedList<>(); } @@ -313,9 +312,10 @@ public class Notebook implements NoteEventListener { } public void moveNoteToTrash(String noteId) { - for (InterpreterSetting interpreterSetting : interpreterSettingManager - .getInterpreterSettings(noteId)) { - interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId); + try { + interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>()); + } catch (IOException e) { + e.printStackTrace(); } } @@ -339,7 +339,7 @@ public class Notebook implements NoteEventListener { // remove from all interpreter instance's angular object registry for (InterpreterSetting settings : interpreterSettingManager.get()) { AngularObjectRegistry registry = - settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); + settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { // remove paragraph scope object for (Paragraph p : note.getParagraphs()) { @@ -374,7 +374,7 @@ public class Notebook implements NoteEventListener { } } - ResourcePoolUtils.removeResourcesBelongsToNote(id); + interpreterSettingManager.removeResourcesBelongsToNote(id); fireNoteRemoveEvent(note); @@ -521,7 +521,8 @@ public class Notebook implements NoteEventListener { SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); List<InterpreterSetting> settings = interpreterSettingManager.get(); for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId()); + InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(), + note.getId()); if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); String noteId = snapshot.getAngularObject().getNoteId(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 37138e6..bfe4566 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -93,10 +93,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph // see ZEPPELIN-212 - Object results; + volatile Object results; // For backward compatibility of note.json format after ZEPPELIN-212 - Object result; + volatile Object result; private Map<String, ParagraphRuntimeInfo> runtimeInfos; /** @@ -157,7 +157,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } @Override - public void setResult(Object results) { + public synchronized void setResult(Object results) { this.results = results; } @@ -354,7 +354,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } @Override - public Object getReturn() { + public synchronized Object getReturn() { return results; } @@ -401,6 +401,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { logger.error("Can not find interpreter name " + repl); throw new RuntimeException("Can not find interpreter for " + getRequiredReplName()); } + //TODO(zjffdu) check interpreter setting status in interpreter setting itself InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId()); while (intp.getStatus().equals( org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) { @@ -560,8 +561,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0); - registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry(); - resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool(); + registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getAngularObjectRegistry(); + resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<>(); @@ -591,8 +594,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0); - registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry(); - resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool(); + registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getAngularObjectRegistry(); + resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) + .getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java deleted file mode 100644 index be45b9e..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.util; - -import org.apache.commons.lang3.StringUtils; - -import java.io.IOException; -import java.util.Properties; - -/** - * TODO(moon) : add description. - */ -public class Util { - private static final String PROJECT_PROPERTIES_VERSION_KEY = "version"; - private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev"; - private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time"; - - private static Properties projectProperties; - private static Properties gitProperties; - - static { - projectProperties = new Properties(); - gitProperties = new Properties(); - try { - projectProperties.load(Util.class.getResourceAsStream("/project.properties")); - gitProperties.load(Util.class.getResourceAsStream("/git.properties")); - } catch (IOException e) { - //Fail to read project.properties - } - } - - /** - * Get Zeppelin version - * - * @return Current Zeppelin version - */ - public static String getVersion() { - return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY), - StringUtils.EMPTY); - } - - /** - * Get Zeppelin Git latest commit id - * - * @return Latest Zeppelin commit id - */ - public static String getGitCommitId() { - return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY), - StringUtils.EMPTY); - } - - /** - * Get Zeppelin Git latest commit timestamp - * - * @return Latest Zeppelin commit timestamp - */ - public static String getGitTimestamp() { - return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY), - StringUtils.EMPTY); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index 305258a..c204711 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -16,14 +16,14 @@ */ package org.apache.zeppelin.helium; -import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.apache.zeppelin.scheduler.Job; @@ -45,14 +45,9 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -public class HeliumApplicationFactoryTest implements JobListenerFactory { - private File tmpDir; - private File notebookDir; - private ZeppelinConfiguration conf; +public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implements JobListenerFactory { + private SchedulerFactory schedulerFactory; - private DependencyResolver depResolver; - private InterpreterFactory factory; - private InterpreterSettingManager interpreterSettingManager; private VFSNotebookRepo notebookRepo; private Notebook notebook; private HeliumApplicationFactory heliumAppFactory; @@ -60,46 +55,15 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { @Before public void setUp() throws Exception { - tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZepelinLTest_"+System.currentTimeMillis()); - tmpDir.mkdirs(); - File confDir = new File(tmpDir, "conf"); - confDir.mkdirs(); - notebookDir = new File(tmpDir + "/notebook"); - notebookDir.mkdirs(); - - File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note - .getParentFile() // zeppelin/zeppelin-zengine/target/test-classes - .getParentFile() // zeppelin/zeppelin-zengine/target - .getParentFile() // zeppelin/zeppelin-zengine - .getParentFile(); // zeppelin - - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath()); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf"); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); - - conf = new ZeppelinConfiguration(); - - this.schedulerFactory = new SchedulerFactory(); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2"); + super.setUp(); + this.schedulerFactory = SchedulerFactory.singleton(); heliumAppFactory = new HeliumApplicationFactory(); - depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - factory = new InterpreterFactory(conf, null, null, heliumAppFactory, depResolver, false, interpreterSettingManager); - HashMap<String, String> env = new HashMap<>(); - env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); - factory.setEnv(env); - - ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); - interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); - - ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); - interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null); - interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); + // set AppEventListener properly + for (InterpreterSetting interpreterSetting : interpreterSettingManager.get()) { + interpreterSetting.setAppEventListener(heliumAppFactory); + } SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -108,7 +72,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { conf, notebookRepo, schedulerFactory, - factory, + interpreterFactory, interpreterSettingManager, this, search, @@ -124,16 +88,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { @After public void tearDown() throws Exception { - List<InterpreterSetting> settings = interpreterSettingManager.get(); - for (InterpreterSetting setting : settings) { - for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) { - intpGroup.close(); - } - } - - FileUtils.deleteDirectory(tmpDir); - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), - ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue()); + super.tearDown(); } @@ -150,7 +105,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { "", ""); Note note1 = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreters("user", note1.getId(),interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.setInterpreterBinding("user", note1.getId(),interpreterSettingManager.getInterpreterSettingIds()); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -196,7 +151,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { "", ""); Note note1 = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreters("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); + interpreterSettingManager.setInterpreterBinding("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds()); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -236,7 +191,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { "", ""); Note note1 = notebook.createNote(anonymous); - notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); + notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds()); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -297,7 +252,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory { "", ""); Note note1 = notebook.createNote(anonymous); - notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); + notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds()); String mock1IntpSettingId = null; for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) { if (setting.getName().equals("mock1")) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java index 6b4932d..bdd639e 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java @@ -52,7 +52,7 @@ public class HeliumTest { // given File heliumConf = new File(tmpDir, "helium.conf"); Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), - null, null, null); + null, null, null, null); assertFalse(heliumConf.exists()); // when @@ -63,14 +63,14 @@ public class HeliumTest { // then load without exception Helium heliumRestored = new Helium( - heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null); + heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null); } @Test public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException { File heliumConf = new File(tmpDir, "helium.conf"); Helium helium = new Helium( - heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null); + heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null); HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1"); HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2"); helium.addRegistry(registry1); @@ -105,7 +105,7 @@ public class HeliumTest { public void testRefresh() throws IOException, URISyntaxException, TaskRunnerException { File heliumConf = new File(tmpDir, "helium.conf"); Helium helium = new Helium( - heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null); + heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null); HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1"); helium.addRegistry(registry1); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java deleted file mode 100644 index aaa8864..0000000 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.NullArgumentException; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; -import org.apache.zeppelin.dep.Dependency; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.interpreter.mock.MockInterpreter1; -import org.apache.zeppelin.interpreter.mock.MockInterpreter2; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.notebook.JobListenerFactory; -import org.apache.zeppelin.notebook.Note; -import org.apache.zeppelin.notebook.Notebook; -import org.apache.zeppelin.notebook.NotebookAuthorization; -import org.apache.zeppelin.notebook.repo.NotebookRepo; -import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; -import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.apache.zeppelin.search.SearchService; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.quartz.SchedulerException; -import org.sonatype.aether.RepositoryException; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class InterpreterFactoryTest { - - private InterpreterFactory factory; - private InterpreterSettingManager interpreterSettingManager; - private File tmpDir; - private ZeppelinConfiguration conf; - private InterpreterContext context; - private Notebook notebook; - private NotebookRepo notebookRepo; - private DependencyResolver depResolver; - private SchedulerFactory schedulerFactory; - private NotebookAuthorization notebookAuthorization; - @Mock - private JobListenerFactory jobListenerFactory; - - @Before - public void setUp() throws Exception { - tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); - tmpDir.mkdirs(); - new File(tmpDir, "conf").mkdirs(); - FileUtils.copyDirectory(new File("src/test/resources/interpreter"), new File(tmpDir, "interpreter")); - - System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); - System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), - "mock1,mock2,mock11,dev"); - conf = new ZeppelinConfiguration(); - schedulerFactory = new SchedulerFactory(); - depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); - context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null); - - ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); - Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); - intp1Properties.put("PROPERTY_1", - new InterpreterProperty("PROPERTY_1", "VALUE_1")); - intp1Properties.put("property_2", - new InterpreterProperty("property_2", "value_2")); - interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); - - ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); - interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null); - interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); - - SearchService search = mock(SearchService.class); - notebookRepo = new VFSNotebookRepo(conf); - notebookAuthorization = NotebookAuthorization.init(conf); - notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, jobListenerFactory, search, - notebookAuthorization, null); - } - - @After - public void tearDown() throws Exception { - FileUtils.deleteDirectory(tmpDir); - } - - @Test - public void testBasic() { - List<InterpreterSetting> all = interpreterSettingManager.get(); - InterpreterSetting mock1Setting = null; - for (InterpreterSetting setting : all) { - if (setting.getName().equals("mock1")) { - mock1Setting = setting; - break; - } - } - -// mock1Setting = factory.createNewSetting("mock11", "mock1", new ArrayList<Dependency>(), new InterpreterOption(false), new Properties()); - - InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess"); - factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session"); - - // get interpreter - assertNotNull("get Interpreter", interpreterGroup.get("session").get(0)); - - // try to get unavailable interpreter - assertNull(interpreterSettingManager.get("unknown")); - - // restart interpreter - interpreterSettingManager.restart(mock1Setting.getId()); - assertNull(mock1Setting.getInterpreterGroup("user", "sharedProcess").get("session")); - } - - @Test - public void testRemoteRepl() throws Exception { - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); - Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); - intp1Properties.put("PROPERTY_1", - new InterpreterProperty("PROPERTY_1", "VALUE_1")); - intp1Properties.put("property_2", new InterpreterProperty("property_2", "value_2")); - interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); - factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); - List<InterpreterSetting> all = interpreterSettingManager.get(); - InterpreterSetting mock1Setting = null; - for (InterpreterSetting setting : all) { - if (setting.getName().equals("mock1")) { - mock1Setting = setting; - break; - } - } - InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess"); - factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session"); - // get interpreter - assertNotNull("get Interpreter", interpreterGroup.get("session").get(0)); - assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter); - LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0)); - assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter); - RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter(); - assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1")); - assertEquals("value_2", remoteInterpreter.getProperty("property_2")); - } - - /** - * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter - * won't affect user2's interpreter - * @throws Exception - */ - @Test - public void testRestartInterpreterInScopedMode() throws Exception { - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); - Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); - intp1Properties.put("PROPERTY_1", - new InterpreterProperty("PROPERTY_1", "VALUE_1")); - intp1Properties.put("property_2", - new InterpreterProperty("property_2", "value_2")); - interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); - factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); - List<InterpreterSetting> all = interpreterSettingManager.get(); - InterpreterSetting mock1Setting = null; - for (InterpreterSetting setting : all) { - if (setting.getName().equals("mock1")) { - mock1Setting = setting; - break; - } - } - mock1Setting.getOption().setPerUser("scoped"); - mock1Setting.getOption().setPerNote("shared"); - // set remote as false so that we won't create new remote interpreter process - mock1Setting.getOption().setRemote(false); - mock1Setting.getOption().setHost("localhost"); - mock1Setting.getOption().setPort(2222); - InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess"); - factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1"); - factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2"); - - LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0); - interpreter1.open(); - LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0); - interpreter2.open(); - - mock1Setting.closeAndRemoveInterpreterGroup("sharedProcess", "user1"); - assertFalse(interpreter1.isOpen()); - assertTrue(interpreter2.isOpen()); - } - - /** - * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter - * won't affect user2's interpreter - * @throws Exception - */ - @Test - public void testRestartInterpreterInIsolatedMode() throws Exception { - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); - interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); - interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), - Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); - Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); - intp1Properties.put("PROPERTY_1", - new InterpreterProperty("PROPERTY_1", "VALUE_1")); - intp1Properties.put("property_2", - new InterpreterProperty("property_2", "value_2")); - interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); - factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); - List<InterpreterSetting> all = interpreterSettingManager.get(); - InterpreterSetting mock1Setting = null; - for (InterpreterSetting setting : all) { - if (setting.getName().equals("mock1")) { - mock1Setting = setting; - break; - } - } - mock1Setting.getOption().setPerUser("isolated"); - mock1Setting.getOption().setPerNote("shared"); - // set remote as false so that we won't create new remote interpreter process - mock1Setting.getOption().setRemote(false); - mock1Setting.getOption().setHost("localhost"); - mock1Setting.getOption().setPort(2222); - InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1"); - InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2"); - factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session"); - factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session"); - - LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0); - interpreter1.open(); - LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0); - interpreter2.open(); - - mock1Setting.closeAndRemoveInterpreterGroup("note1", "user1"); - assertFalse(interpreter1.isOpen()); - assertTrue(interpreter2.isOpen()); - } - - @Test - public void testFactoryDefaultList() throws IOException, RepositoryException { - // get default settings - List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList(); - assertTrue(interpreterSettingManager.get().size() >= all.size()); - } - - @Test - public void testExceptions() throws InterpreterException, IOException, RepositoryException { - List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList(); - // add setting with null option & properties expected nullArgumentException.class - try { - interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null); - } catch(NullArgumentException e) { - assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage()); - } - try { - interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null); - } catch (NullArgumentException e){ - assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage()); - } - } - - - @Test - public void testSaveLoad() throws IOException, RepositoryException { - // interpreter settings - int numInterpreters = interpreterSettingManager.get().size(); - - // check if file saved - assertTrue(new File(conf.getInterpreterSettingPath()).exists()); - - interpreterSettingManager.createNewSetting("new-mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>()); - assertEquals(numInterpreters + 1, interpreterSettingManager.get().size()); - - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - - /* - Current situation, if InterpreterSettinfRef doesn't have the key of InterpreterSetting, it would be ignored. - Thus even though interpreter.json have several interpreterSetting in that file, it would be ignored and would not be initialized from loadFromFile. - In this case, only "mock11" would be referenced from file under interpreter/mock, and "mock11" group would be initialized. - */ - // TODO(jl): Decide how to handle the know referenced interpreterSetting. - assertEquals(1, interpreterSettingManager.get().size()); - } - - @Test - public void testInterpreterSettingPropertyClass() throws IOException, RepositoryException { - // check if default interpreter reference's property type is map - Map<String, InterpreterSetting> interpreterSettingRefs = interpreterSettingManager.getAvailableInterpreterSettings(); - InterpreterSetting intpSetting = interpreterSettingRefs.get("mock1"); - Map<String, DefaultInterpreterProperty> intpProperties = - (Map<String, DefaultInterpreterProperty>) intpSetting.getProperties(); - assertTrue(intpProperties instanceof Map); - - // check if interpreter instance is saved as Properties in conf/interpreter.json file - Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>(); - properties.put("key1", new InterpreterProperty("key1", "value1", "type1")); - properties.put("key2", new InterpreterProperty("key2", "value2", "type2")); - - interpreterSettingManager.createNewSetting("newMock", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), properties); - - String confFilePath = conf.getInterpreterSettingPath(); - byte[] encoded = Files.readAllBytes(Paths.get(confFilePath)); - String json = new String(encoded, "UTF-8"); - - InterpreterInfoSaving infoSaving = InterpreterInfoSaving.fromJson(json); - Map<String, InterpreterSetting> interpreterSettings = infoSaving.interpreterSettings; - for (String key : interpreterSettings.keySet()) { - InterpreterSetting setting = interpreterSettings.get(key); - if (setting.getName().equals("newMock")) { - assertEquals(setting.getProperties().toString(), properties.toString()); - } - } - } - - @Test - public void testInterpreterAliases() throws IOException, RepositoryException { - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); - final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null); - final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null); - interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>() {{ - add(info1); - }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null); - interpreterSettingManager.add("group2", new ArrayList<InterpreterInfo>(){{ - add(info2); - }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null); - - final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); - final InterpreterSetting setting2 = interpreterSettingManager.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); - - interpreterSettingManager.setInterpreters("user", "note", new ArrayList<String>() {{ - add(setting1.getId()); - add(setting2.getId()); - }}); - - assertEquals("className1", factory.getInterpreter("user1", "note", "test-group1").getClassName()); - assertEquals("className1", factory.getInterpreter("user1", "note", "group1").getClassName()); - } - - @Test - public void testMultiUser() throws IOException, RepositoryException { - interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); - factory = new InterpreterFactory(conf, null, null, null, depResolver, true, interpreterSettingManager); - final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null); - interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>(){{ - add(info1); - }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null); - - InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED); - final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new HashMap<String, InterpreterProperty>()); - - interpreterSettingManager.setInterpreters("user1", "note", new ArrayList<String>() {{ - add(setting1.getId()); - }}); - - interpreterSettingManager.setInterpreters("user2", "note", new ArrayList<String>() {{ - add(setting1.getId()); - }}); - - assertNotEquals(factory.getInterpreter("user1", "note", "test-group1"), factory.getInterpreter("user2", "note", "test-group1")); - } - - - @Test - public void testInvalidInterpreterSettingName() { - try { - interpreterSettingManager.createNewSetting("new.mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>()); - fail("expect fail because of invalid InterpreterSetting Name"); - } catch (IOException e) { - assertEquals("'.' is invalid for InterpreterSetting name.", e.getMessage()); - } - } - - - @Test - public void getEditorSetting() throws IOException, RepositoryException, SchedulerException { - List<String> intpIds = new ArrayList<>(); - for(InterpreterSetting intpSetting: interpreterSettingManager.get()) { - if (intpSetting.getName().startsWith("mock1")) { - intpIds.add(intpSetting.getId()); - } - } - Note note = notebook.createNote(intpIds, new AuthenticationInfo("anonymous")); - - Interpreter interpreter = factory.getInterpreter("user1", note.getId(), "mock11"); - // get editor setting from interpreter-setting.json - Map<String, Object> editor = interpreterSettingManager.getEditorSetting(interpreter, "user1", note.getId(), "mock11"); - assertEquals("java", editor.get("language")); - - // when interpreter is not loaded via interpreter-setting.json - // or editor setting doesn't exit - editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock1"),"user1", note.getId(), "mock1"); - assertEquals(null, editor.get("language")); - - // when interpreter is not bound to note - editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock11"),"user1", note.getId(), "mock2"); - assertEquals("text", editor.get("language")); - } - - @Test - public void registerCustomInterpreterRunner() throws IOException { - InterpreterSettingManager spyInterpreterSettingManager = spy(interpreterSettingManager); - - doNothing().when(spyInterpreterSettingManager).saveToFile(); - - ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>(); - interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap())); - - spyInterpreterSettingManager.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/normalGroup1", null); - - spyInterpreterSettingManager.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); - - ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); - interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap())); - - InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class); - - when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh"); - - spyInterpreterSettingManager.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner); - - spyInterpreterSettingManager.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); - - spyInterpreterSettingManager.setInterpreters("anonymous", "noteCustome", spyInterpreterSettingManager.getDefaultInterpreterSettingList()); - - factory.getInterpreter("anonymous", "noteCustome", "customGroup1"); - - verify(mockInterpreterRunner, times(1)).getPath(); - } - - @Test - public void interpreterRunnerTest() { - InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class); - String testInterpreterRunner = "relativePath.sh"; - when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux - Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner); - String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); - assertNotEquals(interpreterRunner, testInterpreterRunner); - - testInterpreterRunner = "/AbsolutePath.sh"; - when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); - i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner); - interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); - assertEquals(interpreterRunner, testInterpreterRunner); - } -}