http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java new file mode 100644 index 0000000..12e0caa --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.*; + +import org.apache.commons.lang3.StringUtils; +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * Proxy for Interpreter instance that runs on separate process + */ +public class RemoteInterpreter extends Interpreter { + private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); + + private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private final ApplicationEventListener applicationEventListener; + private Gson gson = new Gson(); + private String interpreterRunner; + private String interpreterPath; + private String localRepoPath; + private String className; + private String sessionKey; + private FormType formType; + private boolean initialized; + private Map<String, String> env; + private int connectTimeout; + private int maxPoolSize; + private String host; + private int port; + private String userName; + private Boolean isUserImpersonate; + private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; + private String interpreterGroupName; + + /** + * Remote interpreter and manage interpreter process + */ + public RemoteInterpreter(Properties property, String sessionKey, String className, + String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout, + int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, + int outputLimit, String interpreterGroupName) { + super(property); + this.sessionKey = sessionKey; + this.className = className; + initialized = false; + this.interpreterRunner = interpreterRunner; + this.interpreterPath = interpreterPath; + this.localRepoPath = localRepoPath; + env = getEnvFromInterpreterProperty(property); + this.connectTimeout = connectTimeout; + this.maxPoolSize = maxPoolSize; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + this.userName = userName; + this.isUserImpersonate = isUserImpersonate; + this.outputLimit = outputLimit; + this.interpreterGroupName = interpreterGroupName; + } + + + /** + * Connect to existing process + */ + public RemoteInterpreter(Properties property, String sessionKey, String className, String host, + int port, String localRepoPath, int connectTimeout, int maxPoolSize, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate, + int outputLimit) { + super(property); + this.sessionKey = sessionKey; + this.className = className; + initialized = false; + this.host = host; + this.port = port; + this.localRepoPath = localRepoPath; + this.connectTimeout = connectTimeout; + this.maxPoolSize = maxPoolSize; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + this.userName = userName; + this.isUserImpersonate = isUserImpersonate; + this.outputLimit = outputLimit; + } + + + // VisibleForTesting + public RemoteInterpreter(Properties property, String sessionKey, String className, + String interpreterRunner, String interpreterPath, String localRepoPath, + Map<String, String> env, int connectTimeout, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) { + super(property); + this.className = className; + this.sessionKey = sessionKey; + this.interpreterRunner = interpreterRunner; + this.interpreterPath = interpreterPath; + this.localRepoPath = localRepoPath; + env.putAll(getEnvFromInterpreterProperty(property)); + this.env = env; + this.connectTimeout = connectTimeout; + this.maxPoolSize = 10; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + this.userName = userName; + this.isUserImpersonate = isUserImpersonate; + } + + private Map<String, String> getEnvFromInterpreterProperty(Properties property) { + Map<String, String> env = new HashMap<String, String>(); + StringBuilder sparkConfBuilder = new StringBuilder(); + for (String key : property.stringPropertyNames()) { + if (RemoteInterpreterUtils.isEnvString(key)) { + env.put(key, property.getProperty(key)); + } + if (key.equals("master")) { + sparkConfBuilder.append(" --master " + property.getProperty("master")); + } + if (isSparkConf(key, property.getProperty(key))) { + sparkConfBuilder.append(" --conf " + key + "=" + + toShellFormat(property.getProperty(key))); + } + } + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); + return env; + } + + private String toShellFormat(String value) { + if (value.contains("\'") && value.contains("\"")) { + throw new RuntimeException("Spark property value could not contain both \" and '"); + } else if (value.contains("\'")) { + return "\"" + value + "\""; + } else { + return "\'" + value + "\'"; + } + } + + static boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + + @Override + public String getClassName() { + return className; + } + + private boolean connectToExistingProcess() { + return host != null && port > 0; + } + + public RemoteInterpreterProcess getInterpreterProcess() { + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup == null) { + return null; + } + + synchronized (intpGroup) { + if (intpGroup.getRemoteInterpreterProcess() == null) { + RemoteInterpreterProcess remoteProcess; + if (connectToExistingProcess()) { + remoteProcess = new RemoteInterpreterRunningProcess( + connectTimeout, + remoteInterpreterProcessListener, + applicationEventListener, + host, + port); + } else { + // create new remote process + remoteProcess = new RemoteInterpreterManagedProcess( + interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, + remoteInterpreterProcessListener, applicationEventListener, interpreterGroupName); + } + + intpGroup.setRemoteInterpreterProcess(remoteProcess); + } + + return intpGroup.getRemoteInterpreterProcess(); + } + } + + public synchronized void init() { + if (initialized == true) { + return; + } + + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + + final InterpreterGroup interpreterGroup = getInterpreterGroup(); + + interpreterProcess.setMaxPoolSize( + Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize())); + String groupId = interpreterGroup.getId(); + + synchronized (interpreterProcess) { + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + logger.info("Create remote interpreter {}", getClassName()); + if (localRepoPath != null) { + property.put("zeppelin.interpreter.localRepo", localRepoPath); + } + + property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit)); + client.createInterpreter(groupId, sessionKey, + getClassName(), (Map) property, userName); + // Push angular object loaded from JSON file to remote interpreter + if (!interpreterGroup.isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + interpreterGroup.setAngularRegistryPushed(true); + } + + } catch (TException e) { + logger.error("Failed to create interpreter: {}", getClassName()); + throw new InterpreterException(e); + } finally { + // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken + interpreterProcess.releaseClient(client, broken); + } + } + initialized = true; + } + + + @Override + public void open() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + + synchronized (interpreterGroup) { + // initialize all interpreters in this interpreter group + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, + // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it + // doesn't call open method, it's not open. It causes problem while running intp.close() + // In case of Spark, this method initializes all of interpreters and init() method increases + // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all + // other interpreters doesn't do anything because those LazyInterpreters aren't open. + // But for now, we have to initialise all of interpreters for some reasons. + // See Interpreter.getInterpreterInTheSameSessionByClassName(String) + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + if (!initialized) { + // reference per session + interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate); + } + for (Interpreter intp : new ArrayList<>(interpreters)) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + try { + ((RemoteInterpreter) p).init(); + } catch (InterpreterException e) { + logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", + p.getClassName()); + interpreters.remove(p); + } + } + } + } + + @Override + public void close() { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + synchronized (interpreterGroup) { + // close all interpreters in this session + List<Interpreter> interpreters = interpreterGroup.get(sessionKey); + // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however, + // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it + // doesn't call open method, it's not open. It causes problem while running intp.close() + // In case of Spark, this method initializes all of interpreters and init() method increases + // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all + // other interpreters doesn't do anything because those LazyInterpreters aren't open. + // But for now, we have to initialise all of interpreters for some reasons. + // See Interpreter.getInterpreterInTheSameSessionByClassName(String) + if (initialized) { + // dereference per session + getInterpreterProcess().dereference(); + } + for (Interpreter intp : new ArrayList<>(interpreters)) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + try { + ((RemoteInterpreter) p).closeInterpreter(); + } catch (InterpreterException e) { + logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup", + p.getClassName()); + interpreters.remove(p); + } + } + } + } + + public void closeInterpreter() { + if (this.initialized == false) { + return; + } + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + boolean broken = false; + try { + client = interpreterProcess.getClient(); + if (client != null) { + client.close(sessionKey, className); + } + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } catch (Exception e1) { + throw new InterpreterException(e1); + } finally { + if (client != null) { + interpreterProcess.releaseClient(client, broken); + } + this.initialized = false; + } + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (logger.isDebugEnabled()) { + logger.debug("st:\n{}", st); + } + + FormType form = getFormType(); + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess + .getInterpreterContextRunnerPool(); + + List<InterpreterContextRunner> runners = context.getRunners(); + if (runners != null && runners.size() != 0) { + // assume all runners in this InterpreterContext have the same note id + String noteId = runners.get(0).getNoteId(); + + interpreterContextRunnerPool.clear(noteId); + interpreterContextRunnerPool.addAll(noteId, runners); + } + + boolean broken = false; + try { + + final GUI currentGUI = context.getGui(); + RemoteInterpreterResult remoteResult = client.interpret( + sessionKey, className, st, convert(context)); + + Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( + remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { + }.getType()); + context.getConfig().clear(); + context.getConfig().putAll(remoteConfig); + + if (form == FormType.NATIVE) { + GUI remoteGui = GUI.fromJson(remoteResult.getGui()); + currentGUI.clear(); + currentGUI.setParams(remoteGui.getParams()); + currentGUI.setForms(remoteGui.getForms()); + } else if (form == FormType.SIMPLE) { + final Map<String, Input> currentForms = currentGUI.getForms(); + final Map<String, Object> currentParams = currentGUI.getParams(); + final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); + final Map<String, Input> remoteForms = remoteGUI.getForms(); + final Map<String, Object> remoteParams = remoteGUI.getParams(); + currentForms.putAll(remoteForms); + currentParams.putAll(remoteParams); + } + + InterpreterResult result = convert(remoteResult); + return result; + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public void cancel(InterpreterContext context) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + client.cancel(sessionKey, className, convert(context)); + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public FormType getFormType() { + open(); + + if (formType != null) { + return formType; + } + + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + formType = FormType.valueOf(client.getFormType(sessionKey, className)); + return formType; + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public int getProgress(InterpreterContext context) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + if (interpreterProcess == null || !interpreterProcess.isRunning()) { + return 0; + } + + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + return client.getProgress(sessionKey, className, convert(context)); + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor, + InterpreterContext interpreterContext) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + Client client = null; + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + throw new InterpreterException(e1); + } + + boolean broken = false; + try { + List completion = client.completion(sessionKey, className, buf, cursor, + convert(interpreterContext)); + return completion; + } catch (TException e) { + broken = true; + throw new InterpreterException(e); + } finally { + interpreterProcess.releaseClient(client, broken); + } + } + + @Override + public Scheduler getScheduler() { + int maxConcurrency = maxPoolSize; + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + if (interpreterProcess == null) { + return null; + } else { + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(), + sessionKey, interpreterProcess, maxConcurrency); + } + } + + private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { + return interpreterGroup.getId(); + } + + private RemoteInterpreterContext convert(InterpreterContext ic) { + return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(), + ic.getParagraphTitle(), ic.getParagraphText(), ic.getAuthenticationInfo().toJson(), + gson.toJson(ic.getConfig()), ic.getGui().toJson(), gson.toJson(ic.getRunners())); + } + + private InterpreterResult convert(RemoteInterpreterResult result) { + InterpreterResult r = new InterpreterResult( + InterpreterResult.Code.valueOf(result.getCode())); + + for (RemoteInterpreterResultMessage m : result.getMsg()) { + r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); + } + + return r; + } + + /** + * Push local angular object registry to + * remote interpreter. This method should be + * call ONLY inside the init() method + */ + void pushAngularObjectRegistryToRemote(Client client) throws TException { + final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() + .getAngularObjectRegistry(); + + if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { + final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry + .getRegistry(); + + logger.info("Push local angular object registry from ZeppelinServer to" + + " remote interpreter group {}", this.getInterpreterGroup().getId()); + + final java.lang.reflect.Type registryType = new TypeToken<Map<String, + Map<String, AngularObject>>>() { + }.getType(); + + Gson gson = new Gson(); + client.angularRegistryPush(gson.toJson(registry, registryType)); + } + } + + public Map<String, String> getEnv() { + return env; + } + + public void addEnv(Map<String, String> env) { + if (this.env == null) { + this.env = new HashMap<>(); + } + this.env.putAll(env); + } + + //Only for test + public String getInterpreterRunner() { + return interpreterRunner; + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java new file mode 100644 index 0000000..1fb9b90 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.commons.exec.*; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + * This class manages start / stop of remote interpreter process + */ +public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess + implements ExecuteResultHandler { + private static final Logger logger = LoggerFactory.getLogger( + RemoteInterpreterManagedProcess.class); + private final String interpreterRunner; + + private DefaultExecutor executor; + private ExecuteWatchdog watchdog; + boolean running = false; + private int port = -1; + private final String interpreterDir; + private final String localRepoDir; + private final String interpreterGroupName; + + private Map<String, String> env; + + public RemoteInterpreterManagedProcess( + String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String interpreterGroupName) { + super(new RemoteInterpreterEventPoller(listener, appListener), + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + this.interpreterGroupName = interpreterGroupName; + } + + RemoteInterpreterManagedProcess(String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout, + String interpreterGroupName) { + super(remoteInterpreterEventPoller, + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + this.interpreterGroupName = interpreterGroupName; + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start(String userName, Boolean isUserImpersonate) { + // start server process + try { + port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + } catch (IOException e1) { + throw new InterpreterException(e1); + } + + CommandLine cmdLine = CommandLine.parse(interpreterRunner); + cmdLine.addArgument("-d", false); + cmdLine.addArgument(interpreterDir, false); + cmdLine.addArgument("-p", false); + cmdLine.addArgument(Integer.toString(port), false); + if (isUserImpersonate && !userName.equals("anonymous")) { + cmdLine.addArgument("-u", false); + cmdLine.addArgument(userName, false); + } + cmdLine.addArgument("-l", false); + cmdLine.addArgument(localRepoDir, false); + cmdLine.addArgument("-g", false); + cmdLine.addArgument(interpreterGroupName, false); + + executor = new DefaultExecutor(); + + ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); + ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); + processOutput.setOutputStream(cmdOut); + + executor.setStreamHandler(new PumpStreamHandler(processOutput)); + watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); + executor.setWatchdog(watchdog); + + try { + Map procEnv = EnvironmentUtils.getProcEnvironment(); + procEnv.putAll(env); + + logger.info("Run interpreter process {}", cmdLine); + executor.execute(cmdLine, procEnv, this); + running = true; + } catch (IOException e) { + running = false; + throw new InterpreterException(e); + } + + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < getConnectTimeout()) { + if (!running) { + try { + cmdOut.flush(); + } catch (IOException e) { + // nothing to do + } + throw new InterpreterException(new String(cmdOut.toByteArray())); + } + + try { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { + break; + } else { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + + "Thread.sleep", e); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Remote interpreter not yet accessible at localhost:" + port); + } + } + } + processOutput.setOutputStream(null); + } + + public void stop() { + if (isRunning()) { + logger.info("kill interpreter process"); + watchdog.destroyProcess(); + } + + executor = null; + watchdog = null; + running = false; + logger.info("Remote process terminated"); + } + + @Override + public void onProcessComplete(int exitValue) { + logger.info("Interpreter process exited {}", exitValue); + running = false; + + } + + @Override + public void onProcessFailed(ExecuteException e) { + logger.info("Interpreter process failed {}", e); + running = false; + } + + public boolean isRunning() { + return running; + } + + private static class ProcessLogOutputStream extends LogOutputStream { + + private Logger logger; + OutputStream out; + + public ProcessLogOutputStream(Logger logger) { + this.logger = logger; + } + + @Override + protected void processLine(String s, int i) { + this.logger.debug(s); + } + + @Override + public void write(byte [] b) throws IOException { + super.write(b); + + if (out != null) { + synchronized (this) { + if (out != null) { + out.write(b); + } + } + } + } + + @Override + public void write(byte [] b, int offset, int len) throws IOException { + super.write(b, offset, len); + + if (out != null) { + synchronized (this) { + if (out != null) { + out.write(b, offset, len); + } + } + } + } + + public void setOutputStream(OutputStream out) { + synchronized (this) { + this.out = out; + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java new file mode 100644 index 0000000..bb176be --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class connects to existing process + */ +public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { + private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; + private final int port; + + public RemoteInterpreterRunningProcess( + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String host, + int port + ) { + super(connectTimeout, listener, appListener); + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start(String userName, Boolean isUserImpersonate) { + // assume process is externally managed. nothing to do + } + + @Override + public void stop() { + // assume process is externally managed. nothing to do + } + + @Override + public boolean isRunning() { + return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java index bc71d89..1505db9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.helium.HeliumPackage; +import org.apache.zeppelin.interpreter.InterpreterGroup; /** * Current state of application http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 4a93d08..198e278 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; +import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.search.SearchService; @@ -125,6 +126,11 @@ public class Note implements ParagraphJobListener, JsonSerializable { id = IdHashes.generateId(); } + private String getDefaultInterpreterName() { + InterpreterSetting setting = interpreterSettingManager.getDefaultInterpreterSetting(getId()); + return null != setting ? setting.getName() : StringUtils.EMPTY; + } + public boolean isPersonalizedMode() { Object v = getConfig().get("personalizedMode"); return null != v && "true".equals(v); @@ -379,7 +385,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { */ public Paragraph removeParagraph(String user, String paragraphId) { removeAllAngularObjectInParagraph(user, paragraphId); - interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId); + ResourcePoolUtils.removeResourcesBelongsToParagraph(getId(), paragraphId); synchronized (paragraphs) { Iterator<Paragraph> i = paragraphs.iterator(); while (i.hasNext()) { @@ -684,7 +690,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id)); } @@ -699,7 +705,7 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); + InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index fd3111b..a0c1dff 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -60,6 +60,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; +import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; @@ -139,7 +140,7 @@ public class Notebook implements NoteEventListener { Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null"); Note note; if (conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING)) { - note = createNote(interpreterSettingManager.getInterpreterSettingIds(), subject); + note = createNote(interpreterSettingManager.getDefaultInterpreterSettingList(), subject); } else { note = createNote(null, subject); } @@ -269,8 +270,8 @@ public class Notebook implements NoteEventListener { } } - interpreterSettingManager.setInterpreterBinding(user, note.getId(), interpreterSettingIds); - // comment out while note.getNoteReplLoader().setInterpreterBinding(...) do the same + interpreterSettingManager.setInterpreters(user, note.getId(), interpreterSettingIds); + // comment out while note.getNoteReplLoader().setInterpreters(...) do the same // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); } } @@ -278,7 +279,7 @@ public class Notebook implements NoteEventListener { List<String> getBindedInterpreterSettingsIds(String id) { Note note = getNote(id); if (note != null) { - return interpreterSettingManager.getInterpreterBinding(note.getId()); + return interpreterSettingManager.getInterpreters(note.getId()); } else { return new LinkedList<>(); } @@ -312,10 +313,9 @@ public class Notebook implements NoteEventListener { } public void moveNoteToTrash(String noteId) { - try { - interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>()); - } catch (IOException e) { - e.printStackTrace(); + for (InterpreterSetting interpreterSetting : interpreterSettingManager + .getInterpreterSettings(noteId)) { + interpreterSettingManager.removeInterpretersForNote(interpreterSetting, "", noteId); } } @@ -339,7 +339,7 @@ public class Notebook implements NoteEventListener { // remove from all interpreter instance's angular object registry for (InterpreterSetting settings : interpreterSettingManager.get()) { AngularObjectRegistry registry = - settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); + settings.getInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { // remove paragraph scope object for (Paragraph p : note.getParagraphs()) { @@ -374,7 +374,7 @@ public class Notebook implements NoteEventListener { } } - interpreterSettingManager.removeResourcesBelongsToNote(id); + ResourcePoolUtils.removeResourcesBelongsToNote(id); fireNoteRemoveEvent(note); @@ -521,8 +521,7 @@ public class Notebook implements NoteEventListener { SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); List<InterpreterSetting> settings = interpreterSettingManager.get(); for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(), - note.getId()); + InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId()); if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); String noteId = snapshot.getAngularObject().getNoteId(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index bfe4566..37138e6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -93,10 +93,10 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph // see ZEPPELIN-212 - volatile Object results; + Object results; // For backward compatibility of note.json format after ZEPPELIN-212 - volatile Object result; + Object result; private Map<String, ParagraphRuntimeInfo> runtimeInfos; /** @@ -157,7 +157,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } @Override - public synchronized void setResult(Object results) { + public void setResult(Object results) { this.results = results; } @@ -354,7 +354,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { } @Override - public synchronized Object getReturn() { + public Object getReturn() { return results; } @@ -401,7 +401,6 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { logger.error("Can not find interpreter name " + repl); throw new RuntimeException("Can not find interpreter for " + getRequiredReplName()); } - //TODO(zjffdu) check interpreter setting status in interpreter setting itself InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId()); while (intp.getStatus().equals( org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) { @@ -561,10 +560,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0); - registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) - .getAngularObjectRegistry(); - resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) - .getResourcePool(); + registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry(); + resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<>(); @@ -594,10 +591,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable { if (!interpreterSettingManager.getInterpreterSettings(note.getId()).isEmpty()) { InterpreterSetting intpGroup = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0); - registry = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) - .getAngularObjectRegistry(); - resourcePool = intpGroup.getOrCreateInterpreterGroup(getUser(), note.getId()) - .getResourcePool(); + registry = intpGroup.getInterpreterGroup(getUser(), note.getId()).getAngularObjectRegistry(); + resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool(); } List<InterpreterContextRunner> runners = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java new file mode 100644 index 0000000..be45b9e --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/Util.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.util; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.Properties; + +/** + * TODO(moon) : add description. + */ +public class Util { + private static final String PROJECT_PROPERTIES_VERSION_KEY = "version"; + private static final String GIT_PROPERTIES_COMMIT_ID_KEY = "git.commit.id.abbrev"; + private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time"; + + private static Properties projectProperties; + private static Properties gitProperties; + + static { + projectProperties = new Properties(); + gitProperties = new Properties(); + try { + projectProperties.load(Util.class.getResourceAsStream("/project.properties")); + gitProperties.load(Util.class.getResourceAsStream("/git.properties")); + } catch (IOException e) { + //Fail to read project.properties + } + } + + /** + * Get Zeppelin version + * + * @return Current Zeppelin version + */ + public static String getVersion() { + return StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY), + StringUtils.EMPTY); + } + + /** + * Get Zeppelin Git latest commit id + * + * @return Latest Zeppelin commit id + */ + public static String getGitCommitId() { + return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY), + StringUtils.EMPTY); + } + + /** + * Get Zeppelin Git latest commit timestamp + * + * @return Latest Zeppelin commit timestamp + */ + public static String getGitTimestamp() { + return StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY), + StringUtils.EMPTY); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index c204711..305258a 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -16,14 +16,14 @@ */ package org.apache.zeppelin.helium; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.apache.zeppelin.scheduler.Job; @@ -45,9 +45,14 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implements JobListenerFactory { - +public class HeliumApplicationFactoryTest implements JobListenerFactory { + private File tmpDir; + private File notebookDir; + private ZeppelinConfiguration conf; private SchedulerFactory schedulerFactory; + private DependencyResolver depResolver; + private InterpreterFactory factory; + private InterpreterSettingManager interpreterSettingManager; private VFSNotebookRepo notebookRepo; private Notebook notebook; private HeliumApplicationFactory heliumAppFactory; @@ -55,15 +60,46 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem @Before public void setUp() throws Exception { - System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2"); - super.setUp(); + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZepelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + File confDir = new File(tmpDir, "conf"); + confDir.mkdirs(); + notebookDir = new File(tmpDir + "/notebook"); + notebookDir.mkdirs(); + + File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note + .getParentFile() // zeppelin/zeppelin-zengine/target/test-classes + .getParentFile() // zeppelin/zeppelin-zengine/target + .getParentFile() // zeppelin/zeppelin-zengine + .getParentFile(); // zeppelin + + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + + conf = new ZeppelinConfiguration(); + + this.schedulerFactory = new SchedulerFactory(); - this.schedulerFactory = SchedulerFactory.singleton(); heliumAppFactory = new HeliumApplicationFactory(); - // set AppEventListener properly - for (InterpreterSetting interpreterSetting : interpreterSettingManager.get()) { - interpreterSetting.setAppEventListener(heliumAppFactory); - } + depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, heliumAppFactory, depResolver, false, interpreterSettingManager); + HashMap<String, String> env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + factory.setEnv(env); + + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); + + ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); + interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null); + interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); @@ -72,7 +108,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem conf, notebookRepo, schedulerFactory, - interpreterFactory, + factory, interpreterSettingManager, this, search, @@ -88,7 +124,16 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem @After public void tearDown() throws Exception { - super.tearDown(); + List<InterpreterSetting> settings = interpreterSettingManager.get(); + for (InterpreterSetting setting : settings) { + for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) { + intpGroup.close(); + } + } + + FileUtils.deleteDirectory(tmpDir); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), + ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue()); } @@ -105,7 +150,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem "", ""); Note note1 = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note1.getId(),interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note1.getId(),interpreterSettingManager.getDefaultInterpreterSettingList()); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -151,7 +196,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem "", ""); Note note1 = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -191,7 +236,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem "", ""); Note note1 = notebook.createNote(anonymous); - notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds()); + notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -252,7 +297,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem "", ""); Note note1 = notebook.createNote(anonymous); - notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getInterpreterSettingIds()); + notebook.bindInterpretersToNote("user", note1.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); String mock1IntpSettingId = null; for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) { if (setting.getName().equals("mock1")) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java index bdd639e..6b4932d 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java @@ -52,7 +52,7 @@ public class HeliumTest { // given File heliumConf = new File(tmpDir, "helium.conf"); Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), - null, null, null, null); + null, null, null); assertFalse(heliumConf.exists()); // when @@ -63,14 +63,14 @@ public class HeliumTest { // then load without exception Helium heliumRestored = new Helium( - heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null); + heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null); } @Test public void testRestoreRegistryInstances() throws IOException, URISyntaxException, TaskRunnerException { File heliumConf = new File(tmpDir, "helium.conf"); Helium helium = new Helium( - heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null); + heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null); HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1"); HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2"); helium.addRegistry(registry1); @@ -105,7 +105,7 @@ public class HeliumTest { public void testRefresh() throws IOException, URISyntaxException, TaskRunnerException { File heliumConf = new File(tmpDir, "helium.conf"); Helium helium = new Helium( - heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null, null); + heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath(), null, null, null); HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1"); helium.addRegistry(registry1); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java new file mode 100644 index 0000000..aaa8864 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.NullArgumentException; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.interpreter.mock.MockInterpreter1; +import org.apache.zeppelin.interpreter.mock.MockInterpreter2; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.notebook.JobListenerFactory; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.NotebookAuthorization; +import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.apache.zeppelin.search.SearchService; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.quartz.SchedulerException; +import org.sonatype.aether.RepositoryException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class InterpreterFactoryTest { + + private InterpreterFactory factory; + private InterpreterSettingManager interpreterSettingManager; + private File tmpDir; + private ZeppelinConfiguration conf; + private InterpreterContext context; + private Notebook notebook; + private NotebookRepo notebookRepo; + private DependencyResolver depResolver; + private SchedulerFactory schedulerFactory; + private NotebookAuthorization notebookAuthorization; + @Mock + private JobListenerFactory jobListenerFactory; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + new File(tmpDir, "conf").mkdirs(); + FileUtils.copyDirectory(new File("src/test/resources/interpreter"), new File(tmpDir, "interpreter")); + + System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); + System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), + "mock1,mock2,mock11,dev"); + conf = new ZeppelinConfiguration(); + schedulerFactory = new SchedulerFactory(); + depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null); + + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); + intp1Properties.put("PROPERTY_1", + new InterpreterProperty("PROPERTY_1", "VALUE_1")); + intp1Properties.put("property_2", + new InterpreterProperty("property_2", "value_2")); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); + + ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); + interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null); + interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); + + SearchService search = mock(SearchService.class); + notebookRepo = new VFSNotebookRepo(conf); + notebookAuthorization = NotebookAuthorization.init(conf); + notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, jobListenerFactory, search, + notebookAuthorization, null); + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(tmpDir); + } + + @Test + public void testBasic() { + List<InterpreterSetting> all = interpreterSettingManager.get(); + InterpreterSetting mock1Setting = null; + for (InterpreterSetting setting : all) { + if (setting.getName().equals("mock1")) { + mock1Setting = setting; + break; + } + } + +// mock1Setting = factory.createNewSetting("mock11", "mock1", new ArrayList<Dependency>(), new InterpreterOption(false), new Properties()); + + InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess"); + factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session"); + + // get interpreter + assertNotNull("get Interpreter", interpreterGroup.get("session").get(0)); + + // try to get unavailable interpreter + assertNull(interpreterSettingManager.get("unknown")); + + // restart interpreter + interpreterSettingManager.restart(mock1Setting.getId()); + assertNull(mock1Setting.getInterpreterGroup("user", "sharedProcess").get("session")); + } + + @Test + public void testRemoteRepl() throws Exception { + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); + intp1Properties.put("PROPERTY_1", + new InterpreterProperty("PROPERTY_1", "VALUE_1")); + intp1Properties.put("property_2", new InterpreterProperty("property_2", "value_2")); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + List<InterpreterSetting> all = interpreterSettingManager.get(); + InterpreterSetting mock1Setting = null; + for (InterpreterSetting setting : all) { + if (setting.getName().equals("mock1")) { + mock1Setting = setting; + break; + } + } + InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user", "sharedProcess"); + factory.createInterpretersForNote(mock1Setting, "user", "sharedProcess", "session"); + // get interpreter + assertNotNull("get Interpreter", interpreterGroup.get("session").get(0)); + assertTrue(interpreterGroup.get("session").get(0) instanceof LazyOpenInterpreter); + LazyOpenInterpreter lazyInterpreter = (LazyOpenInterpreter)(interpreterGroup.get("session").get(0)); + assertTrue(lazyInterpreter.getInnerInterpreter() instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) lazyInterpreter.getInnerInterpreter(); + assertEquals("VALUE_1", remoteInterpreter.getEnv().get("PROPERTY_1")); + assertEquals("value_2", remoteInterpreter.getProperty("property_2")); + } + + /** + * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter + * won't affect user2's interpreter + * @throws Exception + */ + @Test + public void testRestartInterpreterInScopedMode() throws Exception { + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); + intp1Properties.put("PROPERTY_1", + new InterpreterProperty("PROPERTY_1", "VALUE_1")); + intp1Properties.put("property_2", + new InterpreterProperty("property_2", "value_2")); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + List<InterpreterSetting> all = interpreterSettingManager.get(); + InterpreterSetting mock1Setting = null; + for (InterpreterSetting setting : all) { + if (setting.getName().equals("mock1")) { + mock1Setting = setting; + break; + } + } + mock1Setting.getOption().setPerUser("scoped"); + mock1Setting.getOption().setPerNote("shared"); + // set remote as false so that we won't create new remote interpreter process + mock1Setting.getOption().setRemote(false); + mock1Setting.getOption().setHost("localhost"); + mock1Setting.getOption().setPort(2222); + InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess"); + factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1"); + factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2"); + + LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0); + interpreter1.open(); + LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0); + interpreter2.open(); + + mock1Setting.closeAndRemoveInterpreterGroup("sharedProcess", "user1"); + assertFalse(interpreter1.isOpen()); + assertTrue(interpreter2.isOpen()); + } + + /** + * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter + * won't affect user2's interpreter + * @throws Exception + */ + @Test + public void testRestartInterpreterInIsolatedMode() throws Exception { + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + Map<String, InterpreterProperty> intp1Properties = new HashMap<String, InterpreterProperty>(); + intp1Properties.put("PROPERTY_1", + new InterpreterProperty("PROPERTY_1", "VALUE_1")); + intp1Properties.put("property_2", + new InterpreterProperty("property_2", "value_2")); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(true), intp1Properties); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + List<InterpreterSetting> all = interpreterSettingManager.get(); + InterpreterSetting mock1Setting = null; + for (InterpreterSetting setting : all) { + if (setting.getName().equals("mock1")) { + mock1Setting = setting; + break; + } + } + mock1Setting.getOption().setPerUser("isolated"); + mock1Setting.getOption().setPerNote("shared"); + // set remote as false so that we won't create new remote interpreter process + mock1Setting.getOption().setRemote(false); + mock1Setting.getOption().setHost("localhost"); + mock1Setting.getOption().setPort(2222); + InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1"); + InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2"); + factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session"); + factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session"); + + LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0); + interpreter1.open(); + LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0); + interpreter2.open(); + + mock1Setting.closeAndRemoveInterpreterGroup("note1", "user1"); + assertFalse(interpreter1.isOpen()); + assertTrue(interpreter2.isOpen()); + } + + @Test + public void testFactoryDefaultList() throws IOException, RepositoryException { + // get default settings + List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList(); + assertTrue(interpreterSettingManager.get().size() >= all.size()); + } + + @Test + public void testExceptions() throws InterpreterException, IOException, RepositoryException { + List<String> all = interpreterSettingManager.getDefaultInterpreterSettingList(); + // add setting with null option & properties expected nullArgumentException.class + try { + interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null); + } catch(NullArgumentException e) { + assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage()); + } + try { + interpreterSettingManager.add("mock2", new ArrayList<InterpreterInfo>(), new LinkedList<Dependency>(), new InterpreterOption(false), Collections.EMPTY_MAP, "", null); + } catch (NullArgumentException e){ + assertEquals("Test null properties" , e.getMessage(),new NullArgumentException("properties").getMessage()); + } + } + + + @Test + public void testSaveLoad() throws IOException, RepositoryException { + // interpreter settings + int numInterpreters = interpreterSettingManager.get().size(); + + // check if file saved + assertTrue(new File(conf.getInterpreterSettingPath()).exists()); + + interpreterSettingManager.createNewSetting("new-mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>()); + assertEquals(numInterpreters + 1, interpreterSettingManager.get().size()); + + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + + /* + Current situation, if InterpreterSettinfRef doesn't have the key of InterpreterSetting, it would be ignored. + Thus even though interpreter.json have several interpreterSetting in that file, it would be ignored and would not be initialized from loadFromFile. + In this case, only "mock11" would be referenced from file under interpreter/mock, and "mock11" group would be initialized. + */ + // TODO(jl): Decide how to handle the know referenced interpreterSetting. + assertEquals(1, interpreterSettingManager.get().size()); + } + + @Test + public void testInterpreterSettingPropertyClass() throws IOException, RepositoryException { + // check if default interpreter reference's property type is map + Map<String, InterpreterSetting> interpreterSettingRefs = interpreterSettingManager.getAvailableInterpreterSettings(); + InterpreterSetting intpSetting = interpreterSettingRefs.get("mock1"); + Map<String, DefaultInterpreterProperty> intpProperties = + (Map<String, DefaultInterpreterProperty>) intpSetting.getProperties(); + assertTrue(intpProperties instanceof Map); + + // check if interpreter instance is saved as Properties in conf/interpreter.json file + Map<String, InterpreterProperty> properties = new HashMap<String, InterpreterProperty>(); + properties.put("key1", new InterpreterProperty("key1", "value1", "type1")); + properties.put("key2", new InterpreterProperty("key2", "value2", "type2")); + + interpreterSettingManager.createNewSetting("newMock", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), properties); + + String confFilePath = conf.getInterpreterSettingPath(); + byte[] encoded = Files.readAllBytes(Paths.get(confFilePath)); + String json = new String(encoded, "UTF-8"); + + InterpreterInfoSaving infoSaving = InterpreterInfoSaving.fromJson(json); + Map<String, InterpreterSetting> interpreterSettings = infoSaving.interpreterSettings; + for (String key : interpreterSettings.keySet()) { + InterpreterSetting setting = interpreterSettings.get(key); + if (setting.getName().equals("newMock")) { + assertEquals(setting.getProperties().toString(), properties.toString()); + } + } + } + + @Test + public void testInterpreterAliases() throws IOException, RepositoryException { + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null); + final InterpreterInfo info2 = new InterpreterInfo("className2", "name1", true, null); + interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>() {{ + add(info1); + }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null); + interpreterSettingManager.add("group2", new ArrayList<InterpreterInfo>(){{ + add(info2); + }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path2", null); + + final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); + final InterpreterSetting setting2 = interpreterSettingManager.createNewSetting("test-group2", "group1", new ArrayList<Dependency>(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); + + interpreterSettingManager.setInterpreters("user", "note", new ArrayList<String>() {{ + add(setting1.getId()); + add(setting2.getId()); + }}); + + assertEquals("className1", factory.getInterpreter("user1", "note", "test-group1").getClassName()); + assertEquals("className1", factory.getInterpreter("user1", "note", "group1").getClassName()); + } + + @Test + public void testMultiUser() throws IOException, RepositoryException { + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, true, interpreterSettingManager); + final InterpreterInfo info1 = new InterpreterInfo("className1", "name1", true, null); + interpreterSettingManager.add("group1", new ArrayList<InterpreterInfo>(){{ + add(info1); + }}, new ArrayList<Dependency>(), new InterpreterOption(true), Collections.EMPTY_MAP, "/path1", null); + + InterpreterOption perUserInterpreterOption = new InterpreterOption(true, InterpreterOption.ISOLATED, InterpreterOption.SHARED); + final InterpreterSetting setting1 = interpreterSettingManager.createNewSetting("test-group1", "group1", new ArrayList<Dependency>(), perUserInterpreterOption, new HashMap<String, InterpreterProperty>()); + + interpreterSettingManager.setInterpreters("user1", "note", new ArrayList<String>() {{ + add(setting1.getId()); + }}); + + interpreterSettingManager.setInterpreters("user2", "note", new ArrayList<String>() {{ + add(setting1.getId()); + }}); + + assertNotEquals(factory.getInterpreter("user1", "note", "test-group1"), factory.getInterpreter("user2", "note", "test-group1")); + } + + + @Test + public void testInvalidInterpreterSettingName() { + try { + interpreterSettingManager.createNewSetting("new.mock1", "mock1", new LinkedList<Dependency>(), new InterpreterOption(false), new HashMap<String, InterpreterProperty>()); + fail("expect fail because of invalid InterpreterSetting Name"); + } catch (IOException e) { + assertEquals("'.' is invalid for InterpreterSetting name.", e.getMessage()); + } + } + + + @Test + public void getEditorSetting() throws IOException, RepositoryException, SchedulerException { + List<String> intpIds = new ArrayList<>(); + for(InterpreterSetting intpSetting: interpreterSettingManager.get()) { + if (intpSetting.getName().startsWith("mock1")) { + intpIds.add(intpSetting.getId()); + } + } + Note note = notebook.createNote(intpIds, new AuthenticationInfo("anonymous")); + + Interpreter interpreter = factory.getInterpreter("user1", note.getId(), "mock11"); + // get editor setting from interpreter-setting.json + Map<String, Object> editor = interpreterSettingManager.getEditorSetting(interpreter, "user1", note.getId(), "mock11"); + assertEquals("java", editor.get("language")); + + // when interpreter is not loaded via interpreter-setting.json + // or editor setting doesn't exit + editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock1"),"user1", note.getId(), "mock1"); + assertEquals(null, editor.get("language")); + + // when interpreter is not bound to note + editor = interpreterSettingManager.getEditorSetting(factory.getInterpreter("user1", note.getId(), "mock11"),"user1", note.getId(), "mock2"); + assertEquals("text", editor.get("language")); + } + + @Test + public void registerCustomInterpreterRunner() throws IOException { + InterpreterSettingManager spyInterpreterSettingManager = spy(interpreterSettingManager); + + doNothing().when(spyInterpreterSettingManager).saveToFile(); + + ArrayList<InterpreterInfo> interpreterInfos1 = new ArrayList<>(); + interpreterInfos1.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap())); + + spyInterpreterSettingManager.add("normalGroup1", interpreterInfos1, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/normalGroup1", null); + + spyInterpreterSettingManager.createNewSetting("normalGroup1", "normalGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); + + ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); + interpreterInfos2.add(new InterpreterInfo("name1.class", "name1", true, Maps.<String, Object>newHashMap())); + + InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class); + + when(mockInterpreterRunner.getPath()).thenReturn("custom-linux-path.sh"); + + spyInterpreterSettingManager.add("customGroup1", interpreterInfos2, Lists.<Dependency>newArrayList(), new InterpreterOption(true), Maps.<String, DefaultInterpreterProperty>newHashMap(), "/customGroup1", mockInterpreterRunner); + + spyInterpreterSettingManager.createNewSetting("customGroup1", "customGroup1", Lists.<Dependency>newArrayList(), new InterpreterOption(true), new HashMap<String, InterpreterProperty>()); + + spyInterpreterSettingManager.setInterpreters("anonymous", "noteCustome", spyInterpreterSettingManager.getDefaultInterpreterSettingList()); + + factory.getInterpreter("anonymous", "noteCustome", "customGroup1"); + + verify(mockInterpreterRunner, times(1)).getPath(); + } + + @Test + public void interpreterRunnerTest() { + InterpreterRunner mockInterpreterRunner = mock(InterpreterRunner.class); + String testInterpreterRunner = "relativePath.sh"; + when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); // This test only for Linux + Interpreter i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner); + String interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); + assertNotEquals(interpreterRunner, testInterpreterRunner); + + testInterpreterRunner = "/AbsolutePath.sh"; + when(mockInterpreterRunner.getPath()).thenReturn(testInterpreterRunner); + i = factory.createRemoteRepl("path1", "sessionKey", "className", new Properties(), interpreterSettingManager.get().get(0).getId(), "userName", false, mockInterpreterRunner); + interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); + assertEquals(interpreterRunner, testInterpreterRunner); + } +}