http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java new file mode 100644 index 0000000..3f84cd0 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -0,0 +1,911 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.annotations.SerializedName; +import com.google.gson.internal.StringMap; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.remote.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE; +import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT; +import static org.apache.zeppelin.util.IdHashes.generateId; + +/** + * Represent one InterpreterSetting in the interpreter setting page + */ +public class InterpreterSetting { + + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSetting.class); + private static final String SHARED_PROCESS = "shared_process"; + private static final String SHARED_SESSION = "shared_session"; + private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of( + "language", (Object) "text", + "editOnDblClick", false); + + private String id; + private String name; + // the original interpreter setting template name where it is created from + private String group; + + //TODO(zjffdu) make the interpreter.json consistent with interpreter-setting.json + /** + * properties can be either Properties or Map<String, InterpreterProperty> + * properties should be: + * - Properties when Interpreter instances are saved to `conf/interpreter.json` file + * - Map<String, InterpreterProperty> when Interpreters are registered + * : this is needed after https://github.com/apache/zeppelin/pull/1145 + * which changed the way of getting default interpreter setting AKA interpreterSettingsRef + * Note(mina): In order to simplify the implementation, I chose to change properties + * from Properties to Object instead of creating new classes. + */ + private Object properties = new Properties(); + + private Status status; + private String errorReason; + + @SerializedName("interpreterGroup") + private List<InterpreterInfo> interpreterInfos; + + private List<Dependency> dependencies = new ArrayList<>(); + private InterpreterOption option = new InterpreterOption(true); + + @SerializedName("runner") + private InterpreterRunner interpreterRunner; + + /////////////////////////////////////////////////////////////////////////////////////////// + private transient InterpreterSettingManager interpreterSettingManager; + private transient String interpreterDir; + private final transient Map<String, InterpreterGroup> interpreterGroups = + new ConcurrentHashMap<>(); + + private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock; + private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock; + + private transient AngularObjectRegistryListener angularObjectRegistryListener; + private transient RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private transient ApplicationEventListener appEventListener; + private transient DependencyResolver dependencyResolver; + + private transient Map<String, String> infos; + + // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. + // This map is used to clear the infos in paragraph when the interpretersetting is restarted + private transient Map<String, Set<String>> runtimeInfosToBeCleared; + + private transient ZeppelinConfiguration conf = new ZeppelinConfiguration(); + + private transient Map<String, URLClassLoader> cleanCl = + Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); + /////////////////////////////////////////////////////////////////////////////////////////// + + + /** + * Builder class for InterpreterSetting + */ + public static class Builder { + private InterpreterSetting interpreterSetting; + + public Builder() { + this.interpreterSetting = new InterpreterSetting(); + } + + public Builder setId(String id) { + interpreterSetting.id = id; + return this; + } + + public Builder setName(String name) { + interpreterSetting.name = name; + return this; + } + + public Builder setGroup(String group) { + interpreterSetting.group = group; + return this; + } + + public Builder setInterpreterInfos(List<InterpreterInfo> interpreterInfos) { + interpreterSetting.interpreterInfos = interpreterInfos; + return this; + } + + public Builder setProperties(Object properties) { + interpreterSetting.properties = properties; + return this; + } + + public Builder setOption(InterpreterOption option) { + interpreterSetting.option = option; + return this; + } + + public Builder setInterpreterDir(String interpreterDir) { + interpreterSetting.interpreterDir = interpreterDir; + return this; + } + + public Builder setRunner(InterpreterRunner runner) { + interpreterSetting.interpreterRunner = runner; + return this; + } + + public Builder setDependencies(List<Dependency> dependencies) { + interpreterSetting.dependencies = dependencies; + return this; + } + + public Builder setConf(ZeppelinConfiguration conf) { + interpreterSetting.conf = conf; + return this; + } + + public Builder setDependencyResolver(DependencyResolver dependencyResolver) { + interpreterSetting.dependencyResolver = dependencyResolver; + return this; + } + +// public Builder setInterpreterRunner(InterpreterRunner runner) { +// interpreterSetting.interpreterRunner = runner; +// return this; +// } + + public Builder setIntepreterSettingManager( + InterpreterSettingManager interpreterSettingManager) { + interpreterSetting.interpreterSettingManager = interpreterSettingManager; + return this; + } + + public Builder setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener + remoteInterpreterProcessListener) { + interpreterSetting.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + return this; + } + + public Builder setAngularObjectRegistryListener( + AngularObjectRegistryListener angularObjectRegistryListener) { + interpreterSetting.angularObjectRegistryListener = angularObjectRegistryListener; + return this; + } + + public Builder setApplicationEventListener(ApplicationEventListener applicationEventListener) { + interpreterSetting.appEventListener = applicationEventListener; + return this; + } + + public InterpreterSetting create() { + // post processing + interpreterSetting.postProcessing(); + return interpreterSetting; + } + } + + public InterpreterSetting() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.id = generateId(); + interpreterGroupReadLock = lock.readLock(); + interpreterGroupWriteLock = lock.writeLock(); + } + + void postProcessing() { + this.status = Status.READY; + } + + /** + * Create interpreter from InterpreterSettingTemplate + * + * @param o interpreterSetting from InterpreterSettingTemplate + */ + public InterpreterSetting(InterpreterSetting o) { + this(); + this.id = generateId(); + this.name = o.name; + this.group = o.group; + this.properties = convertInterpreterProperties( + (Map<String, DefaultInterpreterProperty>) o.getProperties()); + this.interpreterInfos = new ArrayList<>(o.getInterpreterInfos()); + this.option = InterpreterOption.fromInterpreterOption(o.getOption()); + this.dependencies = new ArrayList<>(o.getDependencies()); + this.interpreterDir = o.getInterpreterDir(); + this.interpreterRunner = o.getInterpreterRunner(); + this.conf = o.getConf(); + } + + public AngularObjectRegistryListener getAngularObjectRegistryListener() { + return angularObjectRegistryListener; + } + + public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() { + return remoteInterpreterProcessListener; + } + + public ApplicationEventListener getAppEventListener() { + return appEventListener; + } + + public DependencyResolver getDependencyResolver() { + return dependencyResolver; + } + + public InterpreterSettingManager getInterpreterSettingManager() { + return interpreterSettingManager; + } + + public void setAngularObjectRegistryListener(AngularObjectRegistryListener + angularObjectRegistryListener) { + this.angularObjectRegistryListener = angularObjectRegistryListener; + } + + public void setAppEventListener(ApplicationEventListener appEventListener) { + this.appEventListener = appEventListener; + } + + public void setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener + remoteInterpreterProcessListener) { + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + } + + public void setDependencyResolver(DependencyResolver dependencyResolver) { + this.dependencyResolver = dependencyResolver; + } + + public void setInterpreterSettingManager(InterpreterSettingManager interpreterSettingManager) { + this.interpreterSettingManager = interpreterSettingManager; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getGroup() { + return group; + } + + private String getInterpreterGroupId(String user, String noteId) { + String key; + if (option.isExistingProcess) { + key = Constants.EXISTING_PROCESS; + } else if (getOption().isProcess()) { + key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : ""); + } else { + key = SHARED_PROCESS; + } + + //TODO(zjffdu) we encode interpreter setting id into groupId, this is not a good design + return id + ":" + key; + } + + private String getInterpreterSessionId(String user, String noteId) { + String key; + if (option.isExistingProcess()) { + key = Constants.EXISTING_PROCESS; + } else if (option.perNoteScoped() && option.perUserScoped()) { + key = user + ":" + noteId; + } else if (option.perUserScoped()) { + key = user; + } else if (option.perNoteScoped()) { + key = noteId; + } else { + key = SHARED_SESSION; + } + + return key; + } + + public InterpreterGroup getOrCreateInterpreterGroup(String user, String noteId) { + String groupId = getInterpreterGroupId(user, noteId); + try { + interpreterGroupWriteLock.lock(); + if (!interpreterGroups.containsKey(groupId)) { + LOGGER.info("Create InterpreterGroup with groupId {} for user {} and note {}", + groupId, user, noteId); + InterpreterGroup intpGroup = createInterpreterGroup(groupId); + interpreterGroups.put(groupId, intpGroup); + } + return interpreterGroups.get(groupId); + } finally { + interpreterGroupWriteLock.unlock();; + } + } + + void removeInterpreterGroup(String groupId) { + this.interpreterGroups.remove(groupId); + } + + InterpreterGroup getInterpreterGroup(String user, String noteId) { + String groupId = getInterpreterGroupId(user, noteId); + try { + interpreterGroupReadLock.lock(); + return interpreterGroups.get(groupId); + } finally { + interpreterGroupReadLock.unlock();; + } + } + + InterpreterGroup getInterpreterGroup(String groupId) { + return interpreterGroups.get(groupId); + } + + @VisibleForTesting + public ArrayList<InterpreterGroup> getAllInterpreterGroups() { + try { + interpreterGroupReadLock.lock(); + return new ArrayList(interpreterGroups.values()); + } finally { + interpreterGroupReadLock.unlock(); + } + } + + Map<String, Object> getEditorFromSettingByClassName(String className) { + for (InterpreterInfo intpInfo : interpreterInfos) { + if (className.equals(intpInfo.getClassName())) { + if (intpInfo.getEditor() == null) { + break; + } + return intpInfo.getEditor(); + } + } + return DEFAULT_EDITOR; + } + + void closeInterpreters(String user, String noteId) { + InterpreterGroup interpreterGroup = getInterpreterGroup(user, noteId); + if (interpreterGroup != null) { + String sessionId = getInterpreterSessionId(user, noteId); + interpreterGroup.close(sessionId); + } + } + + public void close() { + LOGGER.info("Close InterpreterSetting: " + name); + for (InterpreterGroup intpGroup : interpreterGroups.values()) { + intpGroup.close(); + } + interpreterGroups.clear(); + this.runtimeInfosToBeCleared = null; + this.infos = null; + } + + public void setProperties(Object object) { + if (object instanceof StringMap) { + StringMap<String> map = (StringMap) properties; + Properties newProperties = new Properties(); + for (String key : map.keySet()) { + newProperties.put(key, map.get(key)); + } + this.properties = newProperties; + } else { + this.properties = object; + } + } + + + public Object getProperties() { + return properties; + } + + @VisibleForTesting + public void setProperty(String name, String value) { + ((Map<String, InterpreterProperty>) properties).put(name, new InterpreterProperty(name, value)); + } + + // This method is supposed to be only called by InterpreterSetting + // but not InterpreterSetting Template + public Properties getJavaProperties() { + Properties jProperties = new Properties(); + Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties; + for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) { + jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString()); + } + + if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) { + jProperties.setProperty("zeppelin.interpreter.output.limit", + conf.getInt(ZEPPELIN_INTERPRETER_OUTPUT_LIMIT) + ""); + } + + if (!jProperties.containsKey("zeppelin.interpreter.max.poolsize")) { + jProperties.setProperty("zeppelin.interpreter.max.poolsize", + conf.getInt(ZEPPELIN_INTERPRETER_MAX_POOL_SIZE) + ""); + } + + String interpreterLocalRepoPath = conf.getInterpreterLocalRepoPath(); + //TODO(zjffdu) change it to interpreterDir/{interpreter_name} + jProperties.setProperty("zeppelin.interpreter.localRepo", + interpreterLocalRepoPath + "/" + id); + return jProperties; + } + + public ZeppelinConfiguration getConf() { + return conf; + } + + public void setConf(ZeppelinConfiguration conf) { + this.conf = conf; + } + + public List<Dependency> getDependencies() { + return dependencies; + } + + public void setDependencies(List<Dependency> dependencies) { + this.dependencies = dependencies; + loadInterpreterDependencies(); + } + + public InterpreterOption getOption() { + return option; + } + + public void setOption(InterpreterOption option) { + this.option = option; + } + + public String getInterpreterDir() { + return interpreterDir; + } + + public void setInterpreterDir(String interpreterDir) { + this.interpreterDir = interpreterDir; + } + + public List<InterpreterInfo> getInterpreterInfos() { + return interpreterInfos; + } + + void appendDependencies(List<Dependency> dependencies) { + for (Dependency dependency : dependencies) { + if (!this.dependencies.contains(dependency)) { + this.dependencies.add(dependency); + } + } + loadInterpreterDependencies(); + } + + void setInterpreterOption(InterpreterOption interpreterOption) { + this.option = interpreterOption; + } + + public void setProperties(Properties p) { + this.properties = p; + } + + void setGroup(String group) { + this.group = group; + } + + void setName(String name) { + this.name = name; + } + + /*** + * Interpreter status + */ + public enum Status { + DOWNLOADING_DEPENDENCIES, + ERROR, + READY + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public String getErrorReason() { + return errorReason; + } + + public void setErrorReason(String errorReason) { + this.errorReason = errorReason; + } + + public void setInterpreterInfos(List<InterpreterInfo> interpreterInfos) { + this.interpreterInfos = interpreterInfos; + } + + public void setInfos(Map<String, String> infos) { + this.infos = infos; + } + + public Map<String, String> getInfos() { + return infos; + } + + public InterpreterRunner getInterpreterRunner() { + return interpreterRunner; + } + + public void setInterpreterRunner(InterpreterRunner interpreterRunner) { + this.interpreterRunner = interpreterRunner; + } + + public void addNoteToPara(String noteId, String paraId) { + if (runtimeInfosToBeCleared == null) { + runtimeInfosToBeCleared = new HashMap<>(); + } + Set<String> paraIdSet = runtimeInfosToBeCleared.get(noteId); + if (paraIdSet == null) { + paraIdSet = new HashSet<>(); + runtimeInfosToBeCleared.put(noteId, paraIdSet); + } + paraIdSet.add(paraId); + } + + public Map<String, Set<String>> getNoteIdAndParaMap() { + return runtimeInfosToBeCleared; + } + + public void clearNoteIdAndParaMap() { + runtimeInfosToBeCleared = null; + } + + + //////////////////////////// IMPORTANT //////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////////////// + /////////////////////////////////////////////////////////////////////////////////////// + // This is the only place to create interpreters. For now we always create multiple interpreter + // together (one session). We don't support to create single interpreter yet. + List<Interpreter> createInterpreters(String user, String sessionId) { + List<Interpreter> interpreters = new ArrayList<>(); + List<InterpreterInfo> interpreterInfos = getInterpreterInfos(); + for (InterpreterInfo info : interpreterInfos) { + Interpreter interpreter = null; + if (option.isRemote()) { + interpreter = new RemoteInterpreter(getJavaProperties(), sessionId, + info.getClassName(), user); + } else { + interpreter = createLocalInterpreter(info.getClassName()); + } + + if (info.isDefaultInterpreter()) { + interpreters.add(0, interpreter); + } else { + interpreters.add(interpreter); + } + LOGGER.info("Interpreter {} created for user: {}, sessionId: {}", + interpreter.getClassName(), user, sessionId); + } + return interpreters; + } + + // Create Interpreter in ZeppelinServer for non-remote mode + private Interpreter createLocalInterpreter(String className) + throws InterpreterException { + LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir); + + ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); + try { + + URLClassLoader ccl = cleanCl.get(interpreterDir); + if (ccl == null) { + // classloader fallback + ccl = URLClassLoader.newInstance(new URL[]{}, oldcl); + } + + boolean separateCL = true; + try { // check if server's classloader has driver already. + Class cls = this.getClass().forName(className); + if (cls != null) { + separateCL = false; + } + } catch (Exception e) { + LOGGER.error("exception checking server classloader driver", e); + } + + URLClassLoader cl; + + if (separateCL == true) { + cl = URLClassLoader.newInstance(new URL[]{}, ccl); + } else { + cl = ccl; + } + Thread.currentThread().setContextClassLoader(cl); + + Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className); + Constructor<Interpreter> constructor = + replClass.getConstructor(new Class[]{Properties.class}); + Interpreter repl = constructor.newInstance(getJavaProperties()); + repl.setClassloaderUrls(ccl.getURLs()); + LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)); + return intp; + } catch (SecurityException e) { + throw new InterpreterException(e); + } catch (NoSuchMethodException e) { + throw new InterpreterException(e); + } catch (IllegalArgumentException e) { + throw new InterpreterException(e); + } catch (InstantiationException e) { + throw new InterpreterException(e); + } catch (IllegalAccessException e) { + throw new InterpreterException(e); + } catch (InvocationTargetException e) { + throw new InterpreterException(e); + } catch (ClassNotFoundException e) { + throw new InterpreterException(e); + } finally { + Thread.currentThread().setContextClassLoader(oldcl); + } + } + + RemoteInterpreterProcess createInterpreterProcess() { + RemoteInterpreterProcess remoteInterpreterProcess = null; + int connectTimeout = + conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id; + if (option.isExistingProcess()) { + // TODO(zjffdu) remove the existing process approach seems no one is using this. + // use the existing process + remoteInterpreterProcess = new RemoteInterpreterRunningProcess( + connectTimeout, + remoteInterpreterProcessListener, + appEventListener, + option.getHost(), + option.getPort()); + } else { + // create new remote process + remoteInterpreterProcess = new RemoteInterpreterManagedProcess( + interpreterRunner != null ? interpreterRunner.getPath() : + conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath, + getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout, + remoteInterpreterProcessListener, appEventListener, group); + } + return remoteInterpreterProcess; + } + + private Map<String, String> getEnvFromInterpreterProperty(Properties property) { + Map<String, String> env = new HashMap<String, String>(); + StringBuilder sparkConfBuilder = new StringBuilder(); + for (String key : property.stringPropertyNames()) { + if (RemoteInterpreterUtils.isEnvString(key)) { + env.put(key, property.getProperty(key)); + } + if (key.equals("master")) { + sparkConfBuilder.append(" --master " + property.getProperty("master")); + } + if (isSparkConf(key, property.getProperty(key))) { + sparkConfBuilder.append(" --conf " + key + "=" + + toShellFormat(property.getProperty(key))); + } + } + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); + return env; + } + + private String toShellFormat(String value) { + if (value.contains("\'") && value.contains("\"")) { + throw new RuntimeException("Spark property value could not contain both \" and '"); + } else if (value.contains("\'")) { + return "\"" + value + "\""; + } else { + return "\'" + value + "\'"; + } + } + + static boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + + private List<Interpreter> getOrCreateSession(String user, String noteId) { + InterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId); + Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " + + "noteId {}", user, noteId); + String sessionId = getInterpreterSessionId(user, noteId); + return interpreterGroup.getOrCreateSession(user, sessionId); + } + + public Interpreter getDefaultInterpreter(String user, String noteId) { + return getOrCreateSession(user, noteId).get(0); + } + + public Interpreter getInterpreter(String user, String noteId, String replName) { + Preconditions.checkNotNull(noteId, "noteId should be not null"); + Preconditions.checkNotNull(replName, "replName should be not null"); + + String className = getInterpreterClassFromInterpreterSetting(replName); + if (className == null) { + return null; + } + List<Interpreter> interpreters = getOrCreateSession(user, noteId); + for (Interpreter interpreter : interpreters) { + if (className.equals(interpreter.getClassName())) { + return interpreter; + } + } + return null; + } + + private String getInterpreterClassFromInterpreterSetting(String replName) { + Preconditions.checkNotNull(replName, "replName should be not null"); + + for (InterpreterInfo info : interpreterInfos) { + String infoName = info.getName(); + if (null != info.getName() && replName.equals(infoName)) { + return info.getClassName(); + } + } + return null; + } + + private InterpreterGroup createInterpreterGroup(String groupId) throws InterpreterException { + AngularObjectRegistry angularObjectRegistry; + InterpreterGroup interpreterGroup = new InterpreterGroup(groupId, this); + if (option.isRemote()) { + angularObjectRegistry = + new RemoteAngularObjectRegistry(groupId, angularObjectRegistryListener, interpreterGroup); + } else { + angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener); + // TODO(moon) : create distributed resource pool for local interpreters and set + } + + interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); + return interpreterGroup; + } + + private void loadInterpreterDependencies() { + setStatus(Status.DOWNLOADING_DEPENDENCIES); + setErrorReason(null); + Thread t = new Thread() { + public void run() { + try { + // dependencies to prevent library conflict + File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + getId()); + if (localRepoDir.exists()) { + try { + FileUtils.forceDelete(localRepoDir); + } catch (FileNotFoundException e) { + LOGGER.info("A file that does not exist cannot be deleted, nothing to worry", e); + } + } + + // load dependencies + List<Dependency> deps = getDependencies(); + if (deps != null) { + for (Dependency d : deps) { + File destDir = new File( + conf.getRelativeDir(ZeppelinConfiguration.ConfVars.ZEPPELIN_DEP_LOCALREPO)); + + if (d.getExclusions() != null) { + dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(), + new File(destDir, id)); + } else { + dependencyResolver + .load(d.getGroupArtifactVersion(), new File(destDir, id)); + } + } + } + + setStatus(Status.READY); + setErrorReason(null); + } catch (Exception e) { + LOGGER.error(String.format("Error while downloading repos for interpreter group : %s," + + " go to interpreter setting page click on edit and save it again to make " + + "this interpreter work properly. : %s", + getGroup(), e.getLocalizedMessage()), e); + setErrorReason(e.getLocalizedMessage()); + setStatus(Status.ERROR); + } + } + }; + + t.start(); + } + + //TODO(zjffdu) ugly code, should not use JsonObject as parameter. not readable + public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) { + if (jsonObject != null) { + JsonObject option = jsonObject.getAsJsonObject("option"); + if (option != null) { + JsonArray users = option.getAsJsonArray("users"); + if (users != null) { + if (this.option.getOwners() == null) { + this.option.owners = new LinkedList<>(); + } + for (JsonElement user : users) { + this.option.getOwners().add(user.getAsString()); + } + } + } + } + } + + // For backward compatibility of interpreter.json format after ZEPPELIN-2403 + static Map<String, InterpreterProperty> convertInterpreterProperties(Object properties) { + if (properties != null && properties instanceof StringMap) { + Map<String, InterpreterProperty> newProperties = new HashMap<>(); + StringMap p = (StringMap) properties; + for (Object o : p.entrySet()) { + Map.Entry entry = (Map.Entry) o; + if (!(entry.getValue() instanceof StringMap)) { + InterpreterProperty newProperty = new InterpreterProperty( + entry.getKey().toString(), + entry.getValue(), + InterpreterPropertyType.STRING.getValue()); + newProperties.put(entry.getKey().toString(), newProperty); + } else { + // already converted + return (Map<String, InterpreterProperty>) properties; + } + } + return newProperties; + + } else if (properties instanceof Map) { + Map<String, Object> dProperties = + (Map<String, Object>) properties; + Map<String, InterpreterProperty> newProperties = new HashMap<>(); + for (String key : dProperties.keySet()) { + Object value = dProperties.get(key); + if (value instanceof InterpreterProperty) { + return (Map<String, InterpreterProperty>) properties; + } else if (value instanceof StringMap) { + StringMap stringMap = (StringMap) value; + InterpreterProperty newProperty = new InterpreterProperty( + key, + stringMap.get("value"), + stringMap.get("type").toString()); + + newProperties.put(newProperty.getName(), newProperty); + } else if (value instanceof DefaultInterpreterProperty){ + DefaultInterpreterProperty dProperty = (DefaultInterpreterProperty) value; + InterpreterProperty property = new InterpreterProperty( + key, + dProperty.getValue(), + dProperty.getType() != null ? dProperty.getType() : "string" + // in case user forget to specify type in interpreter-setting.json + ); + newProperties.put(key, property); + } else { + throw new RuntimeException("Can not convert this type of property: " + value.getClass()); + } + } + return newProperties; + } + throw new RuntimeException("Can not convert this type: " + properties.getClass()); + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java new file mode 100644 index 0000000..ed3ebd8 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -0,0 +1,886 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourceSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonatype.aether.repository.Authentication; +import org.sonatype.aether.repository.Proxy; +import org.sonatype.aether.repository.RemoteRepository; + +import java.io.*; +import java.lang.reflect.Type; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.DirectoryStream.Filter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; + +/** + * InterpreterSettingManager is the component which manage all the interpreter settings. + * (load/create/update/remove/get) + * Besides that InterpreterSettingManager also manage the interpreter setting binding. + * TODO(zjffdu) We could move it into another separated component. + */ +public class InterpreterSettingManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSettingManager.class); + private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of( + "language", (Object) "text", + "editOnDblClick", false); + + private final ZeppelinConfiguration conf; + private final Path interpreterDirPath; + private final Path interpreterSettingPath; + + /** + * This is only InterpreterSetting templates with default name and properties + * name --> InterpreterSetting + */ + private final Map<String, InterpreterSetting> interpreterSettingTemplates = + Maps.newConcurrentMap(); + /** + * This is used by creating and running Interpreters + * id --> InterpreterSetting + * TODO(zjffdu) change it to name --> InterpreterSetting + */ + private final Map<String, InterpreterSetting> interpreterSettings = + Maps.newConcurrentMap(); + + /** + * noteId --> list of InterpreterSettingId + */ + private final Map<String, List<String>> interpreterBindings = + Maps.newConcurrentMap(); + + private final List<RemoteRepository> interpreterRepositories; + private InterpreterOption defaultOption; + private List<String> interpreterGroupOrderList; + private final Gson gson; + + private AngularObjectRegistryListener angularObjectRegistryListener; + private RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private ApplicationEventListener appEventListener; + private DependencyResolver dependencyResolver; + + + public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, + AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener + remoteInterpreterProcessListener, + ApplicationEventListener appEventListener) + throws IOException { + this(zeppelinConfiguration, new InterpreterOption(true), + angularObjectRegistryListener, + remoteInterpreterProcessListener, + appEventListener); + } + + @VisibleForTesting + public InterpreterSettingManager(ZeppelinConfiguration conf, + InterpreterOption defaultOption, + AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener + remoteInterpreterProcessListener, + ApplicationEventListener appEventListener) throws IOException { + this.conf = conf; + this.defaultOption = defaultOption; + this.interpreterDirPath = Paths.get(conf.getInterpreterDir()); + LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath); + this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath()); + LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath); + this.dependencyResolver = new DependencyResolver( + conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO)); + this.interpreterRepositories = dependencyResolver.getRepos(); + this.interpreterGroupOrderList = Arrays.asList(conf.getString( + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER).split(",")); + this.gson = new GsonBuilder().setPrettyPrinting().create(); + + this.angularObjectRegistryListener = angularObjectRegistryListener; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.appEventListener = appEventListener; + init(); + } + + /** + * Load interpreter setting from interpreter-setting.json + */ + private void loadFromFile() { + if (!Files.exists(interpreterSettingPath)) { + // nothing to read + LOGGER.warn("Interpreter Setting file {} doesn't exist", interpreterSettingPath); + return; + } + + try { + InterpreterInfoSaving infoSaving = InterpreterInfoSaving.loadFromFile(interpreterSettingPath); + //TODO(zjffdu) still ugly (should move all to InterpreterInfoSaving) + for (InterpreterSetting savedInterpreterSetting : infoSaving.interpreterSettings.values()) { + savedInterpreterSetting.setConf(conf); + savedInterpreterSetting.setInterpreterSettingManager(this); + savedInterpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); + savedInterpreterSetting.setRemoteInterpreterProcessListener( + remoteInterpreterProcessListener); + savedInterpreterSetting.setAppEventListener(appEventListener); + savedInterpreterSetting.setDependencyResolver(dependencyResolver); + savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties( + savedInterpreterSetting.getProperties() + )); + + InterpreterSetting interpreterSettingTemplate = + interpreterSettingTemplates.get(savedInterpreterSetting.getGroup()); + // InterpreterSettingTemplate is from interpreter-setting.json which represent the latest + // InterpreterSetting, while InterpreterSetting is from interpreter.json which represent + // the user saved interpreter setting + if (interpreterSettingTemplate != null) { + savedInterpreterSetting.setInterpreterDir(interpreterSettingTemplate.getInterpreterDir()); + // merge properties from interpreter-setting.json and interpreter.json + Map<String, InterpreterProperty> mergedProperties = + new HashMap<>(InterpreterSetting.convertInterpreterProperties( + interpreterSettingTemplate.getProperties())); + mergedProperties.putAll(InterpreterSetting.convertInterpreterProperties( + savedInterpreterSetting.getProperties())); + savedInterpreterSetting.setProperties(mergedProperties); + // merge InterpreterInfo + savedInterpreterSetting.setInterpreterInfos( + interpreterSettingTemplate.getInterpreterInfos()); + } else { + LOGGER.warn("No InterpreterSetting Template found for InterpreterSetting: " + + savedInterpreterSetting.getGroup()); + } + + // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates + // remove it first + for (InterpreterSetting setting : interpreterSettings.values()) { + if (setting.getName().equals(savedInterpreterSetting.getName())) { + interpreterSettings.remove(setting.getId()); + } + } + savedInterpreterSetting.postProcessing(); + LOGGER.info("Create Interpreter Setting {} from interpreter.json", + savedInterpreterSetting.getName()); + interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting); + } + + interpreterBindings.putAll(infoSaving.interpreterBindings); + + if (infoSaving.interpreterRepositories != null) { + for (RemoteRepository repo : infoSaving.interpreterRepositories) { + if (!dependencyResolver.getRepos().contains(repo)) { + this.interpreterRepositories.add(repo); + } + } + } + } catch (IOException e) { + LOGGER.error("Fail to load interpreter setting configuration file: " + + interpreterSettingPath, e); + } + } + + public void saveToFile() throws IOException { + synchronized (interpreterSettings) { + InterpreterInfoSaving info = new InterpreterInfoSaving(); + info.interpreterBindings = interpreterBindings; + info.interpreterSettings = interpreterSettings; + info.interpreterRepositories = interpreterRepositories; + info.saveToFile(interpreterSettingPath); + } + } + + private void init() throws IOException { + + // 1. detect interpreter setting via interpreter-setting.json in each interpreter folder + // 2. detect interpreter setting in interpreter.json that is saved before + String interpreterJson = conf.getInterpreterJson(); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (Files.exists(interpreterDirPath)) { + for (Path interpreterDir : Files + .newDirectoryStream(interpreterDirPath, new Filter<Path>() { + @Override + public boolean accept(Path entry) throws IOException { + return Files.exists(entry) && Files.isDirectory(entry); + } + })) { + String interpreterDirString = interpreterDir.toString(); + + /** + * Register interpreter by the following ordering + * 1. Register it from path {ZEPPELIN_HOME}/interpreter/{interpreter_name}/ + * interpreter-setting.json + * 2. Register it from interpreter-setting.json in classpath + * {ZEPPELIN_HOME}/interpreter/{interpreter_name} + */ + if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) { + if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) { + LOGGER.warn("No interpreter-setting.json found in " + interpreterDirPath); + } + } + } + } else { + LOGGER.warn("InterpreterDir {} doesn't exist", interpreterDirPath); + } + + loadFromFile(); + saveToFile(); + } + + private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, + String interpreterJson) throws IOException { + URL[] urls = recursiveBuildLibList(new File(interpreterDir)); + ClassLoader tempClassLoader = new URLClassLoader(urls, cl); + + URL url = tempClassLoader.getResource(interpreterJson); + if (url == null) { + return false; + } + + LOGGER.debug("Reading interpreter-setting.json from {} as Resource", url); + List<RegisteredInterpreter> registeredInterpreterList = + getInterpreterListFromJson(url.openStream()); + registerInterpreterSetting(registeredInterpreterList, interpreterDir); + return true; + } + + private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson) + throws IOException { + + Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson); + if (Files.exists(interpreterJsonPath)) { + LOGGER.debug("Reading interpreter-setting.json from file {}", interpreterJsonPath); + List<RegisteredInterpreter> registeredInterpreterList = + getInterpreterListFromJson(new FileInputStream(interpreterJsonPath.toFile())); + registerInterpreterSetting(registeredInterpreterList, interpreterDir); + return true; + } + return false; + } + + private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) { + Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() { + }.getType(); + return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType); + } + + private void registerInterpreterSetting(List<RegisteredInterpreter> registeredInterpreters, + String interpreterDir) throws IOException { + + Map<String, DefaultInterpreterProperty> properties = new HashMap<>(); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + InterpreterOption option = defaultOption; + String group = null; + InterpreterRunner runner = null; + for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) { + //TODO(zjffdu) merge RegisteredInterpreter & InterpreterInfo + InterpreterInfo interpreterInfo = + new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(), + registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor()); + group = registeredInterpreter.getGroup(); + runner = registeredInterpreter.getRunner(); + // use defaultOption if it is not specified in interpreter-setting.json + if (registeredInterpreter.getOption() != null) { + option = registeredInterpreter.getOption(); + } + properties.putAll(registeredInterpreter.getProperties()); + interpreterInfos.add(interpreterInfo); + } + + InterpreterSetting interpreterSettingTemplate = new InterpreterSetting.Builder() + .setGroup(group) + .setName(group) + .setInterpreterInfos(interpreterInfos) + .setProperties(properties) + .setDependencies(new ArrayList<Dependency>()) + .setOption(option) + .setRunner(runner) + .setInterpreterDir(interpreterDir) + .setRunner(runner) + .setConf(conf) + .setIntepreterSettingManager(this) + .create(); + + LOGGER.info("Register InterpreterSettingTemplate & InterpreterSetting: {}", + interpreterSettingTemplate.getName()); + interpreterSettingTemplates.put(interpreterSettingTemplate.getName(), + interpreterSettingTemplate); + + InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate); + interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); + interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); + interpreterSetting.setAppEventListener(appEventListener); + interpreterSetting.setDependencyResolver(dependencyResolver); + interpreterSetting.setInterpreterSettingManager(this); + interpreterSetting.postProcessing(); + interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); + } + + @VisibleForTesting + public InterpreterSetting getDefaultInterpreterSetting(String noteId) { + return getInterpreterSettings(noteId).get(0); + } + + public List<InterpreterSetting> getInterpreterSettings(String noteId) { + List<InterpreterSetting> settings = new ArrayList<>(); + synchronized (interpreterSettings) { + List<String> interpreterSettingIds = interpreterBindings.get(noteId); + if (interpreterSettingIds != null) { + for (String settingId : interpreterSettingIds) { + if (interpreterSettings.containsKey(settingId)) { + settings.add(interpreterSettings.get(settingId)); + } else { + LOGGER.warn("InterpreterSetting {} has been removed, but note {} still bind to it.", + settingId, noteId); + } + } + } + } + return settings; + } + + public InterpreterGroup getInterpreterGroupById(String groupId) { + for (InterpreterSetting setting : interpreterSettings.values()) { + InterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId); + if (interpreterGroup != null) { + return interpreterGroup; + } + } + return null; + } + + //TODO(zjffdu) logic here is a little ugly + public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId, + String replName) { + Map<String, Object> editor = DEFAULT_EDITOR; + String group = StringUtils.EMPTY; + try { + String defaultSettingName = getDefaultInterpreterSetting(noteId).getName(); + List<InterpreterSetting> intpSettings = getInterpreterSettings(noteId); + for (InterpreterSetting intpSetting : intpSettings) { + String[] replNameSplit = replName.split("\\."); + if (replNameSplit.length == 2) { + group = replNameSplit[0]; + } + // when replName is 'name' of interpreter + if (defaultSettingName.equals(intpSetting.getName())) { + editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName()); + } + // when replName is 'alias name' of interpreter or 'group' of interpreter + if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) { + editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName()); + break; + } + } + } catch (NullPointerException e) { + // Use `debug` level because this log occurs frequently + LOGGER.debug("Couldn't get interpreter editor setting"); + } + return editor; + } + + public List<InterpreterGroup> getAllInterpreterGroup() { + List<InterpreterGroup> interpreterGroups = new ArrayList<>(); + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + interpreterGroups.addAll(interpreterSetting.getAllInterpreterGroups()); + } + return interpreterGroups; + } + + //TODO(zjffdu) move Resource related api to ResourceManager + public ResourceSet getAllResources() { + return getAllResourcesExcept(null); + } + + private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) { + ResourceSet resourceSet = new ResourceSet(); + for (InterpreterGroup intpGroup : getAllInterpreterGroup()) { + if (interpreterGroupExcludsion != null && + intpGroup.getId().equals(interpreterGroupExcludsion)) { + continue; + } + + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + } else if (remoteInterpreterProcess.isRunning()) { + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) throws Exception { + return client.resourcePoolGetAll(); + } + }); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } + } + } + return resourceSet; + } + + public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) { + for (InterpreterGroup intpGroup : getAllInterpreterGroup()) { + ResourceSet resourceSet = new ResourceSet(); + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + if (noteId != null) { + resourceSet = resourceSet.filterByNoteId(noteId); + } + if (paragraphId != null) { + resourceSet = resourceSet.filterByParagraphId(paragraphId); + } + + for (Resource r : resourceSet) { + localPool.remove( + r.getResourceId().getNoteId(), + r.getResourceId().getParagraphId(), + r.getResourceId().getName()); + } + } else if (remoteInterpreterProcess.isRunning()) { + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) throws Exception { + return client.resourcePoolGetAll(); + } + }); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } + + if (noteId != null) { + resourceSet = resourceSet.filterByNoteId(noteId); + } + if (paragraphId != null) { + resourceSet = resourceSet.filterByParagraphId(paragraphId); + } + + for (final Resource r : resourceSet) { + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.resourceRemove( + r.getResourceId().getNoteId(), + r.getResourceId().getParagraphId(), + r.getResourceId().getName()); + return null; + } + }); + } + } + } + } + + public void removeResourcesBelongsToNote(String noteId) { + removeResourcesBelongsToParagraph(noteId, null); + } + + /** + * Overwrite dependency jar under local-repo/{interpreterId} + * if jar file in original path is changed + */ + private void copyDependenciesFromLocalPath(final InterpreterSetting setting) { + setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); + synchronized (interpreterSettings) { + final Thread t = new Thread() { + public void run() { + try { + List<Dependency> deps = setting.getDependencies(); + if (deps != null) { + for (Dependency d : deps) { + File destDir = new File( + conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); + + int numSplits = d.getGroupArtifactVersion().split(":").length; + if (!(numSplits >= 3 && numSplits <= 6)) { + dependencyResolver.copyLocalDependency(d.getGroupArtifactVersion(), + new File(destDir, setting.getId())); + } + } + } + setting.setStatus(InterpreterSetting.Status.READY); + } catch (Exception e) { + LOGGER.error(String.format("Error while copying deps for interpreter group : %s," + + " go to interpreter setting page click on edit and save it again to make " + + "this interpreter work properly.", + setting.getGroup()), e); + setting.setErrorReason(e.getLocalizedMessage()); + setting.setStatus(InterpreterSetting.Status.ERROR); + } finally { + + } + } + }; + t.start(); + } + } + + /** + * Return ordered interpreter setting list. + * The list does not contain more than one setting from the same interpreter class. + * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name + */ + public List<String> getInterpreterSettingIds() { + List<String> settingIdList = new ArrayList<>(); + for (InterpreterSetting interpreterSetting : get()) { + settingIdList.add(interpreterSetting.getId()); + } + return settingIdList; + } + + public InterpreterSetting createNewSetting(String name, String group, + List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p) + throws IOException { + + if (name.indexOf(".") >= 0) { + throw new IOException("'.' is invalid for InterpreterSetting name."); + } + // check if name is existed + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + if (interpreterSetting.getName().equals(name)) { + throw new IOException("Interpreter " + name + " already existed"); + } + } + InterpreterSetting setting = new InterpreterSetting(interpreterSettingTemplates.get(group)); + setting.setName(name); + setting.setGroup(group); + //TODO(zjffdu) Should use setDependencies + setting.appendDependencies(dependencies); + setting.setInterpreterOption(option); + setting.setProperties(p); + setting.setAppEventListener(appEventListener); + setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); + setting.setDependencyResolver(dependencyResolver); + setting.setAngularObjectRegistryListener(angularObjectRegistryListener); + setting.setInterpreterSettingManager(this); + setting.postProcessing(); + interpreterSettings.put(setting.getId(), setting); + saveToFile(); + return setting; + } + + @VisibleForTesting + public void addInterpreterSetting(InterpreterSetting interpreterSetting) { + interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting); + interpreterSetting.setAppEventListener(appEventListener); + interpreterSetting.setDependencyResolver(dependencyResolver); + interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); + interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); + interpreterSetting.setInterpreterSettingManager(this); + interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); + } + + /** + * map interpreter ids into noteId + * + * @param user user name + * @param noteId note id + * @param settingIdList InterpreterSetting id list + */ + public void setInterpreterBinding(String user, String noteId, List<String> settingIdList) + throws IOException { + List<String> unBindedSettingIdList = new LinkedList<>(); + + synchronized (interpreterSettings) { + List<String> oldSettingIdList = interpreterBindings.get(noteId); + if (oldSettingIdList != null) { + for (String oldSettingId : oldSettingIdList) { + if (!settingIdList.contains(oldSettingId)) { + unBindedSettingIdList.add(oldSettingId); + } + } + } + interpreterBindings.put(noteId, settingIdList); + saveToFile(); + + for (String settingId : unBindedSettingIdList) { + InterpreterSetting interpreterSetting = interpreterSettings.get(settingId); + //TODO(zjffdu) Add test for this scenario + //only close Interpreters when it is note scoped + if (interpreterSetting.getOption().perNoteIsolated() || + interpreterSetting.getOption().perNoteScoped()) { + interpreterSetting.closeInterpreters(user, noteId); + } + } + } + } + + public List<String> getInterpreterBinding(String noteId) { + return interpreterBindings.get(noteId); + } + + @VisibleForTesting + public void closeNote(String user, String noteId) { + // close interpreters in this note session + LOGGER.info("Close Note: {}", noteId); + List<InterpreterSetting> settings = getInterpreterSettings(noteId); + for (InterpreterSetting setting : settings) { + setting.closeInterpreters(user, noteId); + } + } + + public Map<String, InterpreterSetting> getInterpreterSettingTemplates() { + return interpreterSettingTemplates; + } + + private URL[] recursiveBuildLibList(File path) throws MalformedURLException { + URL[] urls = new URL[0]; + if (path == null || !path.exists()) { + return urls; + } else if (path.getName().startsWith(".")) { + return urls; + } else if (path.isDirectory()) { + File[] files = path.listFiles(); + if (files != null) { + for (File f : files) { + urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f)); + } + } + return urls; + } else { + return new URL[]{path.toURI().toURL()}; + } + } + + public List<RemoteRepository> getRepositories() { + return this.interpreterRepositories; + } + + public void addRepository(String id, String url, boolean snapshot, Authentication auth, + Proxy proxy) throws IOException { + dependencyResolver.addRepo(id, url, snapshot, auth, proxy); + saveToFile(); + } + + public void removeRepository(String id) throws IOException { + dependencyResolver.delRepo(id); + saveToFile(); + } + + public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException { + setInterpreterBinding(user, noteId, new ArrayList<String>()); + interpreterBindings.remove(noteId); + } + + /** + * Change interpreter property and restart + */ + public void setPropertyAndRestart(String id, InterpreterOption option, + Map<String, InterpreterProperty> properties, + List<Dependency> dependencies) throws IOException { + synchronized (interpreterSettings) { + InterpreterSetting intpSetting = interpreterSettings.get(id); + if (intpSetting != null) { + try { + intpSetting.close(); + intpSetting.setOption(option); + intpSetting.setProperties(properties); + intpSetting.setDependencies(dependencies); + intpSetting.postProcessing(); + saveToFile(); + } catch (Exception e) { + loadFromFile(); + throw e; + } + } else { + throw new InterpreterException("Interpreter setting id " + id + " not found"); + } + } + } + + // restart in note page + public void restart(String settingId, String noteId, String user) { + InterpreterSetting intpSetting = interpreterSettings.get(settingId); + Preconditions.checkNotNull(intpSetting); + synchronized (interpreterSettings) { + intpSetting = interpreterSettings.get(settingId); + // Check if dependency in specified path is changed + // If it did, overwrite old dependency jar with new one + if (intpSetting != null) { + //clean up metaInfos + intpSetting.setInfos(null); + copyDependenciesFromLocalPath(intpSetting); + + if (user.equals("anonymous")) { + intpSetting.close(); + } else { + intpSetting.closeInterpreters(user, noteId); + } + + } else { + throw new InterpreterException("Interpreter setting id " + settingId + " not found"); + } + } + } + + public void restart(String id) { + restart(id, "", "anonymous"); + } + + public InterpreterSetting get(String id) { + synchronized (interpreterSettings) { + return interpreterSettings.get(id); + } + } + + @VisibleForTesting + public InterpreterSetting getByName(String name) { + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + if (interpreterSetting.getName().equals(name)) { + return interpreterSetting; + } + } + throw new RuntimeException("No InterpreterSetting: " + name); + } + + public void remove(String id) throws IOException { + // 1. close interpreter groups of this interpreter setting + // 2. remove this interpreter setting + // 3. remove this interpreter setting from note binding + // 4. clean local repo directory + LOGGER.info("Remove interpreter setting: " + id); + synchronized (interpreterSettings) { + if (interpreterSettings.containsKey(id)) { + + InterpreterSetting intp = interpreterSettings.get(id); + intp.close(); + interpreterSettings.remove(id); + for (List<String> settings : interpreterBindings.values()) { + Iterator<String> it = settings.iterator(); + while (it.hasNext()) { + String settingId = it.next(); + if (settingId.equals(id)) { + it.remove(); + } + } + } + saveToFile(); + } + } + + File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id); + FileUtils.deleteDirectory(localRepoDir); + } + + /** + * Get interpreter settings + */ + public List<InterpreterSetting> get() { + synchronized (interpreterSettings) { + List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values()); + Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() { + @Override + public int compare(InterpreterSetting o1, InterpreterSetting o2) { + int i = interpreterGroupOrderList.indexOf(o1.getGroup()); + int j = interpreterGroupOrderList.indexOf(o2.getGroup()); + if (i < 0) { + LOGGER.warn("InterpreterGroup " + o1.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + i = Integer.MAX_VALUE; + } + if (j < 0) { + LOGGER.warn("InterpreterGroup " + o2.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + j = Integer.MAX_VALUE; + } + if (i < j) { + return -1; + } else if (i > j) { + return 1; + } else { + return 0; + } + } + }); + return orderedSettings; + } + } + + @VisibleForTesting + public List<String> getSettingIds() { + List<String> settingIds = new ArrayList<>(); + for (InterpreterSetting interpreterSetting : get()) { + settingIds.add(interpreterSetting.getId()); + } + return settingIds; + } + + public void close(String settingId) { + get(settingId).close(); + } + + public void close() { + List<Thread> closeThreads = new LinkedList<>(); + synchronized (interpreterSettings) { + Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); + for (final InterpreterSetting intpSetting : intpSettings) { + Thread t = new Thread() { + public void run() { + intpSetting.close(); + } + }; + t.start(); + closeThreads.add(t); + } + } + + for (Thread t : closeThreads) { + try { + t.join(); + } catch (InterruptedException e) { + LOGGER.error("Can't close interpreterGroup", e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java new file mode 100644 index 0000000..0817595 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.install; + +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.util.Util; +import org.sonatype.aether.RepositoryException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Commandline utility to install interpreter from maven repository + */ +public class InstallInterpreter { + private final File interpreterListFile; + private final File interpreterBaseDir; + private final List<AvailableInterpreterInfo> availableInterpreters; + private final String localRepoDir; + private URL proxyUrl; + private String proxyUser; + private String proxyPassword; + + /** + * + * @param interpreterListFile + * @param interpreterBaseDir interpreter directory for installing binaries + * @throws IOException + */ + public InstallInterpreter(File interpreterListFile, File interpreterBaseDir, String localRepoDir) + throws IOException { + this.interpreterListFile = interpreterListFile; + this.interpreterBaseDir = interpreterBaseDir; + this.localRepoDir = localRepoDir; + availableInterpreters = new LinkedList<>(); + readAvailableInterpreters(); + } + + + /** + * Information for available informations + */ + private static class AvailableInterpreterInfo { + public final String name; + public final String artifact; + public final String description; + + public AvailableInterpreterInfo(String name, String artifact, String description) { + this.name = name; + this.artifact = artifact; + this.description = description; + } + } + + private void readAvailableInterpreters() throws IOException { + if (!interpreterListFile.isFile()) { + System.err.println("Can't find interpreter list " + interpreterListFile.getAbsolutePath()); + return; + } + String text = FileUtils.readFileToString(interpreterListFile); + String[] lines = text.split("\n"); + + Pattern pattern = Pattern.compile("(\\S+)\\s+(\\S+)\\s+(.*)"); + + int lineNo = 0; + for (String line : lines) { + lineNo++; + if (line == null || line.length() == 0 || line.startsWith("#")) { + continue; + } + + Matcher match = pattern.matcher(line); + if (match.groupCount() != 3) { + System.err.println("Error on line " + lineNo + ", " + line); + continue; + } + + match.find(); + + String name = match.group(1); + String artifact = match.group(2); + String description = match.group(3); + + availableInterpreters.add(new AvailableInterpreterInfo(name, artifact, description)); + } + } + + public List<AvailableInterpreterInfo> list() { + for (AvailableInterpreterInfo info : availableInterpreters) { + System.out.println(info.name + "\t\t\t" + info.description); + } + + return availableInterpreters; + } + + public void installAll() { + for (AvailableInterpreterInfo info : availableInterpreters) { + install(info.name, info.artifact); + } + } + + public void install(String [] names) { + for (String name : names) { + install(name); + } + } + + public void install(String name) { + // find artifact name + for (AvailableInterpreterInfo info : availableInterpreters) { + if (name.equals(info.name)) { + install(name, info.artifact); + return; + } + } + + throw new RuntimeException("Can't find interpreter '" + name + "'"); + } + + public void install(String [] names, String [] artifacts) { + if (names.length != artifacts.length) { + throw new RuntimeException("Length of given names and artifacts are different"); + } + + for (int i = 0; i < names.length; i++) { + install(names[i], artifacts[i]); + } + } + + public void install(String name, String artifact) { + DependencyResolver depResolver = new DependencyResolver(localRepoDir); + if (proxyUrl != null) { + depResolver.setProxy(proxyUrl, proxyUser, proxyPassword); + } + + File installDir = new File(interpreterBaseDir, name); + if (installDir.exists()) { + System.err.println("Directory " + installDir.getAbsolutePath() + + " already exists" + + "\n\nSkipped"); + return; + } + + System.out.println("Install " + name + "(" + artifact + ") to " + + installDir.getAbsolutePath() + " ... "); + + try { + depResolver.load(artifact, installDir); + System.out.println("Interpreter " + name + " installed under " + + installDir.getAbsolutePath() + "."); + startTip(); + } catch (RepositoryException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void setProxy(URL proxyUrl, String proxyUser, String proxyPassword) { + this.proxyUrl = proxyUrl; + this.proxyUser = proxyUser; + this.proxyPassword = proxyPassword; + } + + public static void usage() { + System.out.println("Options"); + System.out.println(" -l, --list List available interpreters"); + System.out.println(" -a, --all Install all available interpreters"); + System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " + + "list)" + + "e.g. md,shell,jdbc,python,angular"); + System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" + + ". " + + "(comma separated list correspond to --name) " + + "e.g. customGroup:customArtifact:customVersion"); + System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port"); + System.out.println(" --proxy-user [user] (Optional) proxy user"); + System.out.println(" --proxy-password [password] (Optional) proxy password"); + } + + public static void main(String [] args) throws IOException { + if (args.length == 0) { + usage(); + return; + } + + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + InstallInterpreter installer = new InstallInterpreter( + new File(conf.getInterpreterListPath()), + new File(conf.getInterpreterDir()), + conf.getInterpreterLocalRepoPath()); + + String names = null; + String artifacts = null; + URL proxyUrl = null; + String proxyUser = null; + String proxyPassword = null; + boolean all = false; + + for (int i = 0; i < args.length; i++) { + String arg = args[i].toLowerCase(Locale.US); + switch (arg) { + case "--list": + case "-l": + installer.list(); + System.exit(0); + break; + case "--all": + case "-a": + all = true; + break; + case "--name": + case "-n": + names = args[++i]; + break; + case "--artifact": + case "-t": + artifacts = args[++i]; + break; + case "--version": + case "-v": + Util.getVersion(); + break; + case "--proxy-url": + proxyUrl = new URL(args[++i]); + break; + case "--proxy-user": + proxyUser = args[++i]; + break; + case "--proxy-password": + proxyPassword = args[++i]; + break; + case "--help": + case "-h": + usage(); + System.exit(0); + break; + default: + System.out.println("Unknown option " + arg); + } + } + + if (proxyUrl != null) { + installer.setProxy(proxyUrl, proxyUser, proxyPassword); + } + + if (all) { + installer.installAll(); + System.exit(0); + } + + if (names != null) { + if (artifacts != null) { + installer.install(names.split(","), artifacts.split(",")); + } else { + installer.install(names.split(",")); + } + } + } + + private static void startTip() { + System.out.println("\n1. Restart Zeppelin" + + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" + + "\n3. Then you can bind the interpreter on your note"); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java new file mode 100644 index 0000000..74a2da2 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import com.google.gson.Gson; +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Proxy for AngularObjectRegistry that exists in remote interpreter process + */ +public class RemoteAngularObjectRegistry extends AngularObjectRegistry { + Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); + private InterpreterGroup interpreterGroup; + + public RemoteAngularObjectRegistry(String interpreterId, + AngularObjectRegistryListener listener, + InterpreterGroup interpreterGroup) { + super(interpreterId, listener); + this.interpreterGroup = interpreterGroup; + } + + private RemoteInterpreterProcess getRemoteInterpreterProcess() { + return interpreterGroup.getRemoteInterpreterProcess(); + } + + /** + * When ZeppelinServer side code want to add angularObject to the registry, + * this method should be used instead of add() + * @param name + * @param o + * @param noteId + * @return + */ + public AngularObject addAndNotifyRemoteProcess(final String name, + final Object o, + final String noteId, + final String paragraphId) { + + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (!remoteInterpreterProcess.isRunning()) { + return super.add(name, o, noteId, paragraphId, true); + } + + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + Gson gson = new Gson(); + client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); + return null; + } + } + ); + + return super.add(name, o, noteId, paragraphId, true); + + } + + /** + * When ZeppelinServer side code want to remove angularObject from the registry, + * this method should be used instead of remove() + * @param name + * @param noteId + * @param paragraphId + * @return + */ + public AngularObject removeAndNotifyRemoteProcess(final String name, + final String noteId, + final String paragraphId) { + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { + return super.remove(name, noteId, paragraphId); + } + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.angularObjectRemove(name, noteId, paragraphId); + return null; + } + } + ); + + return super.remove(name, noteId, paragraphId); + } + + public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { + List<AngularObject> all = getAll(noteId, paragraphId); + for (AngularObject ao : all) { + removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId); + } + } + + @Override + protected AngularObject createNewAngularObject(String name, Object o, String noteId, String + paragraphId) { + return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, + getAngularObjectListener()); + } +}