http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java new file mode 100644 index 0000000..752b4e2 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.zeppelin.dep.Dependency; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.annotations.SerializedName; +import com.google.gson.internal.StringMap; + +import static org.apache.zeppelin.notebook.utility.IdHashes.generateId; + +/** + * Interpreter settings + */ +public class InterpreterSetting { + + private static final Logger logger = LoggerFactory.getLogger(InterpreterSetting.class); + private static final String SHARED_PROCESS = "shared_process"; + private String id; + private String name; + // always be null in case of InterpreterSettingRef + private String group; + private transient Map<String, String> infos; + + // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. + // This map is used to clear the infos in paragraph when the interpretersetting is restarted + private transient Map<String, Set<String>> runtimeInfosToBeCleared; + + /** + * properties can be either Map<String, DefaultInterpreterProperty> or + * Map<String, InterpreterProperty> + * properties should be: + * - Map<String, InterpreterProperty> when Interpreter instances are saved to + * `conf/interpreter.json` file + * - Map<String, DefaultInterpreterProperty> when Interpreters are registered + * : this is needed after https://github.com/apache/zeppelin/pull/1145 + * which changed the way of getting default interpreter setting AKA interpreterSettingsRef + */ + private Object properties; + private Status status; + private String errorReason; + + @SerializedName("interpreterGroup") + private List<InterpreterInfo> interpreterInfos; + private final transient Map<String, InterpreterGroup> interpreterGroupRef = new HashMap<>(); + private List<Dependency> dependencies = new LinkedList<>(); + private InterpreterOption option; + private transient String path; + + @SerializedName("runner") + private InterpreterRunner interpreterRunner; + + @Deprecated + private transient InterpreterGroupFactory interpreterGroupFactory; + + private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock; + private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock; + + public InterpreterSetting() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + interpreterGroupReadLock = lock.readLock(); + interpreterGroupWriteLock = lock.writeLock(); + } + + public InterpreterSetting(String id, String name, String group, + List<InterpreterInfo> interpreterInfos, Object properties, List<Dependency> dependencies, + InterpreterOption option, String path, InterpreterRunner runner) { + this(); + this.id = id; + this.name = name; + this.group = group; + this.interpreterInfos = interpreterInfos; + this.properties = properties; + this.dependencies = dependencies; + this.option = option; + this.path = path; + this.status = Status.READY; + this.interpreterRunner = runner; + } + + public InterpreterSetting(String name, String group, List<InterpreterInfo> interpreterInfos, + Object properties, List<Dependency> dependencies, InterpreterOption option, String path, + InterpreterRunner runner) { + this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path, + runner); + } + + /** + * Create interpreter from interpreterSettingRef + * + * @param o interpreterSetting from interpreterSettingRef + */ + public InterpreterSetting(InterpreterSetting o) { + this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(), + o.getDependencies(), o.getOption(), o.getPath(), o.getInterpreterRunner()); + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getGroup() { + return group; + } + + private String getInterpreterProcessKey(String user, String noteId) { + InterpreterOption option = getOption(); + String key; + if (getOption().isExistingProcess) { + key = Constants.EXISTING_PROCESS; + } else if (getOption().isProcess()) { + key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : ""); + } else { + key = SHARED_PROCESS; + } + + //logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}", + // key, getId(), getName()); + return key; + } + + private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) { + InterpreterOption option = getOption(); + int validCount = 0; + if (getOption().isProcess() + && !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) { + + List<String> processList = Arrays.asList(processKey.split(":")); + List<String> refList = Arrays.asList(refKey.split(":")); + + if (refList.size() <= 1 || processList.size() <= 1) { + return refKey.equals(processKey); + } + + if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) { + validCount = validCount + 1; + } + + if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) { + validCount = validCount + 1; + } + + return (validCount >= 2); + } else { + return refKey.equals(processKey); + } + } + + String getInterpreterSessionKey(String user, String noteId) { + InterpreterOption option = getOption(); + String key; + if (option.isExistingProcess()) { + key = Constants.EXISTING_PROCESS; + } else if (option.perNoteScoped() && option.perUserScoped()) { + key = user + ":" + noteId; + } else if (option.perUserScoped()) { + key = user; + } else if (option.perNoteScoped()) { + key = noteId; + } else { + key = "shared_session"; + } + + logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + + "{}", key, noteId, user, getName()); + return key; + } + + public InterpreterGroup getInterpreterGroup(String user, String noteId) { + String key = getInterpreterProcessKey(user, noteId); + if (!interpreterGroupRef.containsKey(key)) { + String interpreterGroupId = getId() + ":" + key; + InterpreterGroup intpGroup = + interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption()); + + interpreterGroupWriteLock.lock(); + logger.debug("create interpreter group with groupId:" + interpreterGroupId); + interpreterGroupRef.put(key, intpGroup); + interpreterGroupWriteLock.unlock(); + } + try { + interpreterGroupReadLock.lock(); + return interpreterGroupRef.get(key); + } finally { + interpreterGroupReadLock.unlock(); + } + } + + public Collection<InterpreterGroup> getAllInterpreterGroups() { + try { + interpreterGroupReadLock.lock(); + return new LinkedList<>(interpreterGroupRef.values()); + } finally { + interpreterGroupReadLock.unlock(); + } + } + + void closeAndRemoveInterpreterGroup(String noteId, String user) { + if (user.equals("anonymous")) { + user = ""; + } + String processKey = getInterpreterProcessKey(user, noteId); + String sessionKey = getInterpreterSessionKey(user, noteId); + List<InterpreterGroup> groupToRemove = new LinkedList<>(); + InterpreterGroup groupItem; + for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) { + if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) { + interpreterGroupWriteLock.lock(); + // TODO(jl): interpreterGroup has two or more sessionKeys inside it. thus we should not + // remove interpreterGroup if it has two or more values. + groupItem = interpreterGroupRef.get(intpKey); + interpreterGroupWriteLock.unlock(); + groupToRemove.add(groupItem); + } + for (InterpreterGroup groupToClose : groupToRemove) { + // TODO(jl): Fix the logic removing session. Now, it's handled into groupToClose.clsose() + groupToClose.close(interpreterGroupRef, intpKey, sessionKey); + } + groupToRemove.clear(); + } + + //Remove session because all interpreters in this session are closed + //TODO(jl): Change all code to handle interpreter one by one or all at once + + } + + void closeAndRemoveAllInterpreterGroups() { + for (String processKey : new HashSet<>(interpreterGroupRef.keySet())) { + InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey); + for (String sessionKey : new HashSet<>(interpreterGroup.keySet())) { + interpreterGroup.close(interpreterGroupRef, processKey, sessionKey); + } + } + } + + void shutdownAndRemoveAllInterpreterGroups() { + for (InterpreterGroup interpreterGroup : interpreterGroupRef.values()) { + interpreterGroup.shutdown(); + } + } + + public Object getProperties() { + return properties; + } + + public Properties getFlatProperties() { + Properties p = new Properties(); + if (properties != null) { + Map<String, InterpreterProperty> propertyMap = (Map<String, InterpreterProperty>) properties; + for (String key : propertyMap.keySet()) { + InterpreterProperty tmp = propertyMap.get(key); + p.put(tmp.getName() != null ? tmp.getName() : key, + tmp.getValue() != null ? tmp.getValue().toString() : null); + } + } + return p; + } + + public List<Dependency> getDependencies() { + if (dependencies == null) { + return new LinkedList<>(); + } + return dependencies; + } + + public void setDependencies(List<Dependency> dependencies) { + this.dependencies = dependencies; + } + + public InterpreterOption getOption() { + if (option == null) { + option = new InterpreterOption(); + } + + return option; + } + + public void setOption(InterpreterOption option) { + this.option = option; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + public List<InterpreterInfo> getInterpreterInfos() { + return interpreterInfos; + } + + void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) { + this.interpreterGroupFactory = interpreterGroupFactory; + } + + void appendDependencies(List<Dependency> dependencies) { + for (Dependency dependency : dependencies) { + if (!this.dependencies.contains(dependency)) { + this.dependencies.add(dependency); + } + } + } + + void setInterpreterOption(InterpreterOption interpreterOption) { + this.option = interpreterOption; + } + + public void setProperties(Map<String, InterpreterProperty> p) { + this.properties = p; + } + + void setGroup(String group) { + this.group = group; + } + + void setName(String name) { + this.name = name; + } + + /*** + * Interpreter status + */ + public enum Status { + DOWNLOADING_DEPENDENCIES, + ERROR, + READY + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public String getErrorReason() { + return errorReason; + } + + public void setErrorReason(String errorReason) { + this.errorReason = errorReason; + } + + public void setInfos(Map<String, String> infos) { + this.infos = infos; + } + + public Map<String, String> getInfos() { + return infos; + } + + public InterpreterRunner getInterpreterRunner() { + return interpreterRunner; + } + + public void setInterpreterRunner(InterpreterRunner interpreterRunner) { + this.interpreterRunner = interpreterRunner; + } + + public void addNoteToPara(String noteId, String paraId) { + if (runtimeInfosToBeCleared == null) { + runtimeInfosToBeCleared = new HashMap<>(); + } + Set<String> paraIdSet = runtimeInfosToBeCleared.get(noteId); + if (paraIdSet == null) { + paraIdSet = new HashSet<>(); + runtimeInfosToBeCleared.put(noteId, paraIdSet); + } + paraIdSet.add(paraId); + } + + public Map<String, Set<String>> getNoteIdAndParaMap() { + return runtimeInfosToBeCleared; + } + + public void clearNoteIdAndParaMap() { + runtimeInfosToBeCleared = null; + } + + // For backward compatibility of interpreter.json format after ZEPPELIN-2654 + public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) { + if (jsonObject != null) { + JsonObject option = jsonObject.getAsJsonObject("option"); + if (option != null) { + JsonArray users = option.getAsJsonArray("users"); + if (users != null) { + if (this.option.getOwners() == null) { + this.option.owners = new LinkedList<>(); + } + for (JsonElement user : users) { + this.option.getOwners().add(user.getAsString()); + } + } + } + } + } + + // For backward compatibility of interpreter.json format after ZEPPELIN-2403 + public void convertFlatPropertiesToPropertiesWithWidgets() { + StringMap newProperties = new StringMap(); + if (properties != null && properties instanceof StringMap) { + StringMap p = (StringMap) properties; + + for (Object o : p.entrySet()) { + Map.Entry entry = (Map.Entry) o; + if (!(entry.getValue() instanceof StringMap)) { + StringMap newProperty = new StringMap(); + newProperty.put("name", entry.getKey()); + newProperty.put("value", entry.getValue()); + newProperty.put("type", InterpreterPropertyType.TEXTAREA.getValue()); + newProperties.put(entry.getKey().toString(), newProperty); + } else { + // already converted + return; + } + } + + this.properties = newProperties; + } + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java new file mode 100644 index 0000000..12545d6 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -0,0 +1,1136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.lang.reflect.Type; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream.Filter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFilePermission; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Job.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonatype.aether.RepositoryException; +import org.sonatype.aether.repository.Authentication; +import org.sonatype.aether.repository.Proxy; +import org.sonatype.aether.repository.RemoteRepository; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.internal.StringMap; +import com.google.gson.reflect.TypeToken; + +import static java.nio.file.attribute.PosixFilePermission.OWNER_READ; +import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE; + +/** + * TBD + */ +public class InterpreterSettingManager { + + private static final Logger logger = LoggerFactory.getLogger(InterpreterSettingManager.class); + private static final String SHARED_SESSION = "shared_session"; + private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of( + "language", (Object) "text", + "editOnDblClick", false); + + private final ZeppelinConfiguration zeppelinConfiguration; + private final Path interpreterDirPath; + private final Path interpreterBindingPath; + + /** + * This is only references with default settings, name and properties + * key: InterpreterSetting.name + */ + private final Map<String, InterpreterSetting> interpreterSettingsRef; + /** + * This is used by creating and running Interpreters + * key: InterpreterSetting.id <- This is becuase backward compatibility + */ + private final Map<String, InterpreterSetting> interpreterSettings; + private final Map<String, List<String>> interpreterBindings; + + private final DependencyResolver dependencyResolver; + private final List<RemoteRepository> interpreterRepositories; + + private final InterpreterOption defaultOption; + + private final Map<String, URLClassLoader> cleanCl; + + @Deprecated + private String[] interpreterClassList; + private String[] interpreterGroupOrderList; + private InterpreterGroupFactory interpreterGroupFactory; + + private final Gson gson; + + public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, + DependencyResolver dependencyResolver, InterpreterOption interpreterOption) + throws IOException, RepositoryException { + this.zeppelinConfiguration = zeppelinConfiguration; + this.interpreterDirPath = Paths.get(zeppelinConfiguration.getInterpreterDir()); + logger.debug("InterpreterRootPath: {}", interpreterDirPath); + this.interpreterBindingPath = Paths.get(zeppelinConfiguration.getInterpreterSettingPath()); + logger.debug("InterpreterBindingPath: {}", interpreterBindingPath); + + this.interpreterSettingsRef = Maps.newConcurrentMap(); + this.interpreterSettings = Maps.newConcurrentMap(); + this.interpreterBindings = Maps.newConcurrentMap(); + + this.dependencyResolver = dependencyResolver; + this.interpreterRepositories = dependencyResolver.getRepos(); + + this.defaultOption = interpreterOption; + + this.cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); + + String replsConf = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETERS); + this.interpreterClassList = replsConf.split(","); + String groupOrder = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER); + this.interpreterGroupOrderList = groupOrder.split(","); + + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + this.gson = gsonBuilder.create(); + + init(); + } + + /** + * Remember this method doesn't keep current connections after being called + */ + private void loadFromFile() { + if (!Files.exists(interpreterBindingPath)) { + // nothing to read + return; + } + InterpreterInfoSaving infoSaving; + try (BufferedReader jsonReader = + Files.newBufferedReader(interpreterBindingPath, StandardCharsets.UTF_8)) { + JsonParser jsonParser = new JsonParser(); + JsonObject jsonObject = jsonParser.parse(jsonReader).getAsJsonObject(); + infoSaving = gson.fromJson(jsonObject.toString(), InterpreterInfoSaving.class); + + for (String k : infoSaving.interpreterSettings.keySet()) { + InterpreterSetting setting = infoSaving.interpreterSettings.get(k); + + setting.convertFlatPropertiesToPropertiesWithWidgets(); + + List<InterpreterInfo> infos = setting.getInterpreterInfos(); + + // Convert json StringMap to Properties + StringMap<StringMap> p = (StringMap<StringMap>) setting.getProperties(); + Map<String, InterpreterProperty> properties = new HashMap(); + for (String key : p.keySet()) { + StringMap<String> fields = (StringMap<String>) p.get(key); + String type = InterpreterPropertyType.TEXTAREA.getValue(); + try { + type = InterpreterPropertyType.byValue(fields.get("type")).getValue(); + } catch (Exception e) { + logger.warn("Incorrect type of property {} in settings {}", key, + setting.getId()); + } + properties.put(key, new InterpreterProperty(key, fields.get("value"), type)); + } + setting.setProperties(properties); + + // Always use separate interpreter process + // While we decided to turn this feature on always (without providing + // enable/disable option on GUI). + // previously created setting should turn this feature on here. + setting.getOption().setRemote(true); + + setting.convertPermissionsFromUsersToOwners( + jsonObject.getAsJsonObject("interpreterSettings").getAsJsonObject(setting.getId())); + + // Update transient information from InterpreterSettingRef + InterpreterSetting interpreterSettingObject = + interpreterSettingsRef.get(setting.getGroup()); + if (interpreterSettingObject == null) { + logger.warn("can't get InterpreterSetting " + + "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup()); + continue; + } + String depClassPath = interpreterSettingObject.getPath(); + setting.setPath(depClassPath); + + for (InterpreterInfo info : infos) { + if (info.getEditor() == null) { + Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject, + info.getClassName()); + info.setEditor(editor); + } + } + + setting.setInterpreterGroupFactory(interpreterGroupFactory); + + loadInterpreterDependencies(setting); + interpreterSettings.put(k, setting); + } + + interpreterBindings.putAll(infoSaving.interpreterBindings); + + if (infoSaving.interpreterRepositories != null) { + for (RemoteRepository repo : infoSaving.interpreterRepositories) { + if (!dependencyResolver.getRepos().contains(repo)) { + this.interpreterRepositories.add(repo); + } + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void saveToFile() throws IOException { + String jsonString; + + synchronized (interpreterSettings) { + InterpreterInfoSaving info = new InterpreterInfoSaving(); + info.interpreterBindings = interpreterBindings; + info.interpreterSettings = interpreterSettings; + info.interpreterRepositories = interpreterRepositories; + + jsonString = info.toJson(); + } + + if (!Files.exists(interpreterBindingPath)) { + Files.createFile(interpreterBindingPath); + + try { + Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE); + Files.setPosixFilePermissions(interpreterBindingPath, permissions); + } catch (UnsupportedOperationException e) { + // File system does not support Posix file permissions (likely windows) - continue anyway. + logger.warn("unable to setPosixFilePermissions on '{}'.", interpreterBindingPath); + } + } + + FileOutputStream fos = new FileOutputStream(interpreterBindingPath.toFile(), false); + OutputStreamWriter out = new OutputStreamWriter(fos); + out.append(jsonString); + out.close(); + fos.close(); + } + + //TODO(jl): Fix it to remove InterpreterGroupFactory + public void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) { + for (InterpreterSetting setting : interpreterSettings.values()) { + setting.setInterpreterGroupFactory(interpreterGroupFactory); + } + this.interpreterGroupFactory = interpreterGroupFactory; + } + + private void init() throws InterpreterException, IOException, RepositoryException { + String interpreterJson = zeppelinConfiguration.getInterpreterJson(); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + if (Files.exists(interpreterDirPath)) { + for (Path interpreterDir : Files + .newDirectoryStream(interpreterDirPath, new Filter<Path>() { + @Override + public boolean accept(Path entry) throws IOException { + return Files.exists(entry) && Files.isDirectory(entry); + } + })) { + String interpreterDirString = interpreterDir.toString(); + + /** + * Register interpreter by the following ordering + * 1. Register it from path {ZEPPELIN_HOME}/interpreter/{interpreter_name}/ + * interpreter-setting.json + * 2. Register it from interpreter-setting.json in classpath + * {ZEPPELIN_HOME}/interpreter/{interpreter_name} + * 3. Register it by Interpreter.register + */ + if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) { + if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) { + /* + * TODO(jongyoul) + * - Remove these codes below because of legacy code + * - Support ThreadInterpreter + */ + URLClassLoader ccl = new URLClassLoader( + recursiveBuildLibList(interpreterDir.toFile()), cl); + for (String className : interpreterClassList) { + try { + // Load classes + Class.forName(className, true, ccl); + Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet(); + for (String interpreterKey : interpreterKeys) { + if (className + .equals(Interpreter.registeredInterpreters.get(interpreterKey) + .getClassName())) { + Interpreter.registeredInterpreters.get(interpreterKey) + .setPath(interpreterDirString); + logger.info("Interpreter " + interpreterKey + " found. class=" + className); + cleanCl.put(interpreterDirString, ccl); + } + } + } catch (Throwable t) { + // nothing to do + } + } + } + } + } + } + + for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters + .values()) { + logger + .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(), + registeredInterpreter.getClassName(), registeredInterpreter.getProperties()); + } + + // RegisteredInterpreters -> interpreterSettingRef + InterpreterInfo interpreterInfo; + for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) { + interpreterInfo = + new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(), + r.getEditor()); + add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(), + r.getRunner()); + } + + for (String settingId : interpreterSettingsRef.keySet()) { + InterpreterSetting setting = interpreterSettingsRef.get(settingId); + logger.info("InterpreterSettingRef name {}", setting.getName()); + } + + loadFromFile(); + + // if no interpreter settings are loaded, create default set + if (0 == interpreterSettings.size()) { + Map<String, InterpreterSetting> temp = new HashMap<>(); + InterpreterSetting interpreterSetting; + for (InterpreterSetting setting : interpreterSettingsRef.values()) { + interpreterSetting = createFromInterpreterSettingRef(setting); + temp.put(setting.getName(), interpreterSetting); + } + + for (String group : interpreterGroupOrderList) { + if (null != (interpreterSetting = temp.remove(group))) { + interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); + } + } + + for (InterpreterSetting setting : temp.values()) { + interpreterSettings.put(setting.getId(), setting); + } + + saveToFile(); + } + + for (String settingId : interpreterSettings.keySet()) { + InterpreterSetting setting = interpreterSettings.get(settingId); + logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId, + setting.getName()); + } + } + + private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, + String interpreterJson) throws IOException, RepositoryException { + URL[] urls = recursiveBuildLibList(new File(interpreterDir)); + ClassLoader tempClassLoader = new URLClassLoader(urls, cl); + + Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson); + if (!interpreterSettings.hasMoreElements()) { + return false; + } + for (URL url : Collections.list(interpreterSettings)) { + try (InputStream inputStream = url.openStream()) { + logger.debug("Reading {} from {}", interpreterJson, url); + List<RegisteredInterpreter> registeredInterpreterList = + getInterpreterListFromJson(inputStream); + registerInterpreters(registeredInterpreterList, interpreterDir); + } + } + return true; + } + + private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson) + throws IOException, RepositoryException { + + Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson); + if (Files.exists(interpreterJsonPath)) { + logger.debug("Reading {}", interpreterJsonPath); + List<RegisteredInterpreter> registeredInterpreterList = + getInterpreterListFromJson(interpreterJsonPath); + registerInterpreters(registeredInterpreterList, interpreterDir); + return true; + } + return false; + } + + private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename) + throws FileNotFoundException { + return getInterpreterListFromJson(new FileInputStream(filename.toFile())); + } + + private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) { + Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() { + }.getType(); + return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType); + } + + private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters, + String absolutePath) throws IOException, RepositoryException { + + for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) { + InterpreterInfo interpreterInfo = + new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(), + registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor()); + // use defaultOption if it is not specified in interpreter-setting.json + InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption : + registeredInterpreter.getOption(); + add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(), + option, absolutePath, registeredInterpreter.getRunner()); + } + + } + + public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) { + if (settings == null || settings.isEmpty()) { + return null; + } + return settings.get(0); + } + + public InterpreterSetting getDefaultInterpreterSetting(String noteId) { + return getDefaultInterpreterSetting(getInterpreterSettings(noteId)); + } + + public List<InterpreterSetting> getInterpreterSettings(String noteId) { + List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId); + LinkedList<InterpreterSetting> settings = new LinkedList<>(); + + Iterator<String> iter = interpreterSettingIds.iterator(); + while (iter.hasNext()) { + String id = iter.next(); + InterpreterSetting setting = get(id); + if (setting == null) { + // interpreter setting is removed from factory. remove id from here, too + iter.remove(); + } else { + settings.add(setting); + } + } + return settings; + } + + private List<String> getNoteInterpreterSettingBinding(String noteId) { + LinkedList<String> bindings = new LinkedList<>(); + List<String> settingIds = interpreterBindings.get(noteId); + if (settingIds != null) { + bindings.addAll(settingIds); + } + return bindings; + } + + private InterpreterSetting createFromInterpreterSettingRef(String name) { + Preconditions.checkNotNull(name, "reference name should be not null"); + InterpreterSetting settingRef = interpreterSettingsRef.get(name); + return createFromInterpreterSettingRef(settingRef); + } + + private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) { + // should return immutable objects + List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ? + new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos()); + List<Dependency> deps = (null == o.getDependencies()) ? + new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies()); + Map<String, InterpreterProperty> props = + convertInterpreterProperties((Map<String, DefaultInterpreterProperty>) o.getProperties()); + InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption()); + + InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(), + infos, props, deps, option, o.getPath(), o.getInterpreterRunner()); + setting.setInterpreterGroupFactory(interpreterGroupFactory); + return setting; + } + + private Map<String, InterpreterProperty> convertInterpreterProperties( + Map<String, DefaultInterpreterProperty> defaultProperties) { + Map<String, InterpreterProperty> properties = new HashMap<>(); + + for (String key : defaultProperties.keySet()) { + DefaultInterpreterProperty defaultInterpreterProperty = defaultProperties.get(key); + properties.put(key, new InterpreterProperty(key, defaultInterpreterProperty.getValue(), + defaultInterpreterProperty.getType())); + } + return properties; + } + + public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId, + String replName) { + Map<String, Object> editor = DEFAULT_EDITOR; + String group = StringUtils.EMPTY; + try { + String defaultSettingName = getDefaultInterpreterSetting(noteId).getName(); + List<InterpreterSetting> intpSettings = getInterpreterSettings(noteId); + for (InterpreterSetting intpSetting : intpSettings) { + String[] replNameSplit = replName.split("\\."); + if (replNameSplit.length == 2) { + group = replNameSplit[0]; + } + // when replName is 'name' of interpreter + if (defaultSettingName.equals(intpSetting.getName())) { + editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName()); + } + // when replName is 'alias name' of interpreter or 'group' of interpreter + if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) { + editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName()); + break; + } + } + } catch (NullPointerException e) { + // Use `debug` level because this log occurs frequently + logger.debug("Couldn't get interpreter editor setting"); + } + return editor; + } + + public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting, + String className) { + List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos(); + for (InterpreterInfo intpInfo : intpInfos) { + + if (className.equals(intpInfo.getClassName())) { + if (intpInfo.getEditor() == null) { + break; + } + return intpInfo.getEditor(); + } + } + return DEFAULT_EDITOR; + } + + private void loadInterpreterDependencies(final InterpreterSetting setting) { + setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); + setting.setErrorReason(null); + interpreterSettings.put(setting.getId(), setting); + synchronized (interpreterSettings) { + final Thread t = new Thread() { + public void run() { + try { + // dependencies to prevent library conflict + File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + + setting.getId()); + if (localRepoDir.exists()) { + try { + FileUtils.forceDelete(localRepoDir); + } catch (FileNotFoundException e) { + logger.info("A file that does not exist cannot be deleted, nothing to worry", e); + } + } + + // load dependencies + List<Dependency> deps = setting.getDependencies(); + if (deps != null) { + for (Dependency d : deps) { + File destDir = new File( + zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); + + if (d.getExclusions() != null) { + dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(), + new File(destDir, setting.getId())); + } else { + dependencyResolver + .load(d.getGroupArtifactVersion(), new File(destDir, setting.getId())); + } + } + } + + setting.setStatus(InterpreterSetting.Status.READY); + setting.setErrorReason(null); + } catch (Exception e) { + logger.error(String.format("Error while downloading repos for interpreter group : %s," + + " go to interpreter setting page click on edit and save it again to make " + + "this interpreter work properly. : %s", + setting.getGroup(), e.getLocalizedMessage()), e); + setting.setErrorReason(e.getLocalizedMessage()); + setting.setStatus(InterpreterSetting.Status.ERROR); + } finally { + interpreterSettings.put(setting.getId(), setting); + } + } + }; + t.start(); + } + } + + /** + * Overwrite dependency jar under local-repo/{interpreterId} + * if jar file in original path is changed + */ + private void copyDependenciesFromLocalPath(final InterpreterSetting setting) { + setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); + interpreterSettings.put(setting.getId(), setting); + synchronized (interpreterSettings) { + final Thread t = new Thread() { + public void run() { + try { + List<Dependency> deps = setting.getDependencies(); + if (deps != null) { + for (Dependency d : deps) { + File destDir = new File( + zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); + + int numSplits = d.getGroupArtifactVersion().split(":").length; + if (!(numSplits >= 3 && numSplits <= 6)) { + dependencyResolver.copyLocalDependency(d.getGroupArtifactVersion(), + new File(destDir, setting.getId())); + } + } + } + setting.setStatus(InterpreterSetting.Status.READY); + } catch (Exception e) { + logger.error(String.format("Error while copying deps for interpreter group : %s," + + " go to interpreter setting page click on edit and save it again to make " + + "this interpreter work properly.", + setting.getGroup()), e); + setting.setErrorReason(e.getLocalizedMessage()); + setting.setStatus(InterpreterSetting.Status.ERROR); + } finally { + interpreterSettings.put(setting.getId(), setting); + } + } + }; + t.start(); + } + } + + /** + * Return ordered interpreter setting list. + * The list does not contain more than one setting from the same interpreter class. + * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name + */ + public List<String> getDefaultInterpreterSettingList() { + // this list will contain default interpreter setting list + List<String> defaultSettings = new LinkedList<>(); + + // to ignore the same interpreter group + Map<String, Boolean> interpreterGroupCheck = new HashMap<>(); + + List<InterpreterSetting> sortedSettings = get(); + + for (InterpreterSetting setting : sortedSettings) { + if (defaultSettings.contains(setting.getId())) { + continue; + } + + if (!interpreterGroupCheck.containsKey(setting.getName())) { + defaultSettings.add(setting.getId()); + interpreterGroupCheck.put(setting.getName(), true); + } + } + return defaultSettings; + } + + List<RegisteredInterpreter> getRegisteredInterpreterList() { + return new ArrayList<>(Interpreter.registeredInterpreters.values()); + } + + + private boolean findDefaultInterpreter(List<InterpreterInfo> infos) { + for (InterpreterInfo interpreterInfo : infos) { + if (interpreterInfo.isDefaultInterpreter()) { + return true; + } + } + return false; + } + + public InterpreterSetting createNewSetting(String name, String group, + List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p) + throws IOException { + if (name.indexOf(".") >= 0) { + throw new IOException("'.' is invalid for InterpreterSetting name."); + } + InterpreterSetting setting = createFromInterpreterSettingRef(group); + setting.setName(name); + setting.setGroup(group); + setting.appendDependencies(dependencies); + setting.setInterpreterOption(option); + setting.setProperties(p); + setting.setInterpreterGroupFactory(interpreterGroupFactory); + interpreterSettings.put(setting.getId(), setting); + loadInterpreterDependencies(setting); + saveToFile(); + return setting; + } + + private InterpreterSetting add(String group, InterpreterInfo interpreterInfo, + Map<String, DefaultInterpreterProperty> interpreterProperties, InterpreterOption option, + String path, InterpreterRunner runner) + throws InterpreterException, IOException, RepositoryException { + ArrayList<InterpreterInfo> infos = new ArrayList<>(); + infos.add(interpreterInfo); + return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path, + runner); + } + + /** + * @param group InterpreterSetting reference name + */ + public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos, + List<Dependency> dependencies, InterpreterOption option, + Map<String, DefaultInterpreterProperty> interpreterProperties, String path, + InterpreterRunner runner) { + Preconditions.checkNotNull(group, "name should not be null"); + Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null"); + Preconditions.checkNotNull(dependencies, "dependencies should not be null"); + Preconditions.checkNotNull(option, "option should not be null"); + Preconditions.checkNotNull(interpreterProperties, "properties should not be null"); + + InterpreterSetting interpreterSetting; + + synchronized (interpreterSettingsRef) { + if (interpreterSettingsRef.containsKey(group)) { + interpreterSetting = interpreterSettingsRef.get(group); + + // Append InterpreterInfo + List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos(); + boolean hasDefaultInterpreter = findDefaultInterpreter(infos); + for (InterpreterInfo interpreterInfo : interpreterInfos) { + if (!infos.contains(interpreterInfo)) { + if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) { + hasDefaultInterpreter = true; + infos.add(0, interpreterInfo); + } else { + infos.add(interpreterInfo); + } + } + } + + // Append dependencies + List<Dependency> dependencyList = interpreterSetting.getDependencies(); + for (Dependency dependency : dependencies) { + if (!dependencyList.contains(dependency)) { + dependencyList.add(dependency); + } + } + + // Append properties + Map<String, DefaultInterpreterProperty> properties = + (Map<String, DefaultInterpreterProperty>) interpreterSetting.getProperties(); + for (String key : interpreterProperties.keySet()) { + if (!properties.containsKey(key)) { + properties.put(key, interpreterProperties.get(key)); + } + } + + } else { + interpreterSetting = + new InterpreterSetting(group, null, interpreterInfos, interpreterProperties, + dependencies, option, path, runner); + interpreterSettingsRef.put(group, interpreterSetting); + } + } + + if (dependencies.size() > 0) { + loadInterpreterDependencies(interpreterSetting); + } + + interpreterSetting.setInterpreterGroupFactory(interpreterGroupFactory); + return interpreterSetting; + } + + /** + * map interpreter ids into noteId + * + * @param noteId note id + * @param ids InterpreterSetting id list + */ + public void setInterpreters(String user, String noteId, List<String> ids) throws IOException { + putNoteInterpreterSettingBinding(user, noteId, ids); + } + + private void putNoteInterpreterSettingBinding(String user, String noteId, + List<String> settingList) throws IOException { + List<String> unBindedSettings = new LinkedList<>(); + + synchronized (interpreterSettings) { + List<String> oldSettings = interpreterBindings.get(noteId); + if (oldSettings != null) { + for (String oldSettingId : oldSettings) { + if (!settingList.contains(oldSettingId)) { + unBindedSettings.add(oldSettingId); + } + } + } + interpreterBindings.put(noteId, settingList); + saveToFile(); + + for (String settingId : unBindedSettings) { + InterpreterSetting setting = get(settingId); + removeInterpretersForNote(setting, user, noteId); + } + } + } + + public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user, + String noteId) { + //TODO(jl): This is only for hotfix. You should fix it as a beautiful way + InterpreterOption interpreterOption = interpreterSetting.getOption(); + if (!(InterpreterOption.SHARED.equals(interpreterOption.perNote) + && InterpreterOption.SHARED.equals(interpreterOption.perUser))) { + interpreterSetting.closeAndRemoveInterpreterGroup(noteId, ""); + } + } + + public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) { + InterpreterOption option = setting.getOption(); + String key; + if (option.isExistingProcess()) { + key = Constants.EXISTING_PROCESS; + } else if (option.perNoteScoped() && option.perUserScoped()) { + key = user + ":" + noteId; + } else if (option.perUserScoped()) { + key = user; + } else if (option.perNoteScoped()) { + key = noteId; + } else { + key = SHARED_SESSION; + } + + logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + + "{}", key, noteId, user, setting.getName()); + return key; + } + + + public List<String> getInterpreters(String noteId) { + return getNoteInterpreterSettingBinding(noteId); + } + + public void closeNote(String user, String noteId) { + // close interpreters in this note session + List<InterpreterSetting> settings = getInterpreterSettings(noteId); + if (settings == null || settings.size() == 0) { + return; + } + + logger.info("closeNote: {}", noteId); + for (InterpreterSetting setting : settings) { + removeInterpretersForNote(setting, user, noteId); + } + } + + public Map<String, InterpreterSetting> getAvailableInterpreterSettings() { + return interpreterSettingsRef; + } + + private URL[] recursiveBuildLibList(File path) throws MalformedURLException { + URL[] urls = new URL[0]; + if (path == null || !path.exists()) { + return urls; + } else if (path.getName().startsWith(".")) { + return urls; + } else if (path.isDirectory()) { + File[] files = path.listFiles(); + if (files != null) { + for (File f : files) { + urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f)); + } + } + return urls; + } else { + return new URL[]{path.toURI().toURL()}; + } + } + + public List<RemoteRepository> getRepositories() { + return this.interpreterRepositories; + } + + public void addRepository(String id, String url, boolean snapshot, Authentication auth, + Proxy proxy) throws IOException { + dependencyResolver.addRepo(id, url, snapshot, auth, proxy); + saveToFile(); + } + + public void removeRepository(String id) throws IOException { + dependencyResolver.delRepo(id); + saveToFile(); + } + + public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException { + List<String> settingIds = interpreterBindings.remove(noteId); + if (settingIds != null) { + for (String settingId : settingIds) { + InterpreterSetting setting = get(settingId); + if (setting != null) { + this.removeInterpretersForNote(setting, user, noteId); + } + } + } + saveToFile(); + } + + /** + * Change interpreter property and restart + */ + public void setPropertyAndRestart(String id, InterpreterOption option, + Map<String, InterpreterProperty> properties, + List<Dependency> dependencies) throws IOException { + synchronized (interpreterSettings) { + InterpreterSetting intpSetting = interpreterSettings.get(id); + if (intpSetting != null) { + try { + stopJobAllInterpreter(intpSetting); + + intpSetting.closeAndRemoveAllInterpreterGroups(); + intpSetting.setOption(option); + intpSetting.setProperties(properties); + intpSetting.setDependencies(dependencies); + loadInterpreterDependencies(intpSetting); + + saveToFile(); + } catch (Exception e) { + loadFromFile(); + throw e; + } + } else { + throw new InterpreterException("Interpreter setting id " + id + " not found"); + } + } + } + + public void restart(String settingId, String noteId, String user) { + InterpreterSetting intpSetting = interpreterSettings.get(settingId); + Preconditions.checkNotNull(intpSetting); + synchronized (interpreterSettings) { + intpSetting = interpreterSettings.get(settingId); + // Check if dependency in specified path is changed + // If it did, overwrite old dependency jar with new one + if (intpSetting != null) { + //clean up metaInfos + intpSetting.setInfos(null); + copyDependenciesFromLocalPath(intpSetting); + + stopJobAllInterpreter(intpSetting); + if (user.equals("anonymous")) { + intpSetting.closeAndRemoveAllInterpreterGroups(); + } else { + intpSetting.closeAndRemoveInterpreterGroup(noteId, user); + } + + } else { + throw new InterpreterException("Interpreter setting id " + settingId + " not found"); + } + } + } + + public void restart(String id) { + restart(id, "", "anonymous"); + } + + private void stopJobAllInterpreter(InterpreterSetting intpSetting) { + if (intpSetting != null) { + for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) { + for (List<Interpreter> interpreters : intpGroup.values()) { + for (Interpreter intp : interpreters) { + for (Job job : intp.getScheduler().getJobsRunning()) { + job.abort(); + job.setStatus(Status.ABORT); + logger.info("Job " + job.getJobName() + " aborted "); + } + for (Job job : intp.getScheduler().getJobsWaiting()) { + job.abort(); + job.setStatus(Status.ABORT); + logger.info("Job " + job.getJobName() + " aborted "); + } + } + } + } + } + } + + public InterpreterSetting get(String name) { + synchronized (interpreterSettings) { + return interpreterSettings.get(name); + } + } + + public void remove(String id) throws IOException { + synchronized (interpreterSettings) { + if (interpreterSettings.containsKey(id)) { + InterpreterSetting intp = interpreterSettings.get(id); + intp.closeAndRemoveAllInterpreterGroups(); + + interpreterSettings.remove(id); + for (List<String> settings : interpreterBindings.values()) { + Iterator<String> it = settings.iterator(); + while (it.hasNext()) { + String settingId = it.next(); + if (settingId.equals(id)) { + it.remove(); + } + } + } + saveToFile(); + } + } + + File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + id); + FileUtils.deleteDirectory(localRepoDir); + } + + /** + * Get interpreter settings + */ + public List<InterpreterSetting> get() { + synchronized (interpreterSettings) { + List<InterpreterSetting> orderedSettings = new LinkedList<>(); + + Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>(); + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + String group = interpreterSetting.getGroup(); + if (!nameInterpreterSettingMap.containsKey(group)) { + nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>()); + } + nameInterpreterSettingMap.get(group).add(interpreterSetting); + } + + for (String groupName : interpreterGroupOrderList) { + List<InterpreterSetting> interpreterSettingList = + nameInterpreterSettingMap.remove(groupName); + if (null != interpreterSettingList) { + for (InterpreterSetting interpreterSetting : interpreterSettingList) { + orderedSettings.add(interpreterSetting); + } + } + } + + List<InterpreterSetting> settings = new ArrayList<>(); + + for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) { + for (InterpreterSetting interpreterSetting : interpreterSettingList) { + settings.add(interpreterSetting); + } + } + + Collections.sort(settings, new Comparator<InterpreterSetting>() { + @Override + public int compare(InterpreterSetting o1, InterpreterSetting o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + + orderedSettings.addAll(settings); + + return orderedSettings; + } + } + + public void close(InterpreterSetting interpreterSetting) { + interpreterSetting.closeAndRemoveAllInterpreterGroups(); + } + + public void close() { + List<Thread> closeThreads = new LinkedList<>(); + synchronized (interpreterSettings) { + Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); + for (final InterpreterSetting intpSetting : intpSettings) { + Thread t = new Thread() { + public void run() { + intpSetting.closeAndRemoveAllInterpreterGroups(); + } + }; + t.start(); + closeThreads.add(t); + } + } + + for (Thread t : closeThreads) { + try { + t.join(); + } catch (InterruptedException e) { + logger.error("Can't close interpreterGroup", e); + } + } + } + + public void shutdown() { + List<Thread> closeThreads = new LinkedList<>(); + synchronized (interpreterSettings) { + Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); + for (final InterpreterSetting intpSetting : intpSettings) { + Thread t = new Thread() { + public void run() { + intpSetting.shutdownAndRemoveAllInterpreterGroups(); + } + }; + t.start(); + closeThreads.add(t); + } + } + + for (Thread t : closeThreads) { + try { + t.join(); + } catch (InterruptedException e) { + logger.error("Can't close interpreterGroup", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java new file mode 100644 index 0000000..3838f63 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.install; + +import org.apache.commons.io.FileUtils; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Logger; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.util.Util; +import org.sonatype.aether.RepositoryException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Commandline utility to install interpreter from maven repository + */ +public class InstallInterpreter { + private final File interpreterListFile; + private final File interpreterBaseDir; + private final List<AvailableInterpreterInfo> availableInterpreters; + private final String localRepoDir; + private URL proxyUrl; + private String proxyUser; + private String proxyPassword; + + /** + * + * @param interpreterListFile + * @param interpreterBaseDir interpreter directory for installing binaries + * @throws IOException + */ + public InstallInterpreter(File interpreterListFile, File interpreterBaseDir, String localRepoDir) + throws IOException { + this.interpreterListFile = interpreterListFile; + this.interpreterBaseDir = interpreterBaseDir; + this.localRepoDir = localRepoDir; + availableInterpreters = new LinkedList<>(); + readAvailableInterpreters(); + } + + + /** + * Information for available informations + */ + private static class AvailableInterpreterInfo { + public final String name; + public final String artifact; + public final String description; + + public AvailableInterpreterInfo(String name, String artifact, String description) { + this.name = name; + this.artifact = artifact; + this.description = description; + } + } + + private void readAvailableInterpreters() throws IOException { + if (!interpreterListFile.isFile()) { + System.err.println("Can't find interpreter list " + interpreterListFile.getAbsolutePath()); + return; + } + String text = FileUtils.readFileToString(interpreterListFile); + String[] lines = text.split("\n"); + + Pattern pattern = Pattern.compile("(\\S+)\\s+(\\S+)\\s+(.*)"); + + int lineNo = 0; + for (String line : lines) { + lineNo++; + if (line == null || line.length() == 0 || line.startsWith("#")) { + continue; + } + + Matcher match = pattern.matcher(line); + if (match.groupCount() != 3) { + System.err.println("Error on line " + lineNo + ", " + line); + continue; + } + + match.find(); + + String name = match.group(1); + String artifact = match.group(2); + String description = match.group(3); + + availableInterpreters.add(new AvailableInterpreterInfo(name, artifact, description)); + } + } + + public List<AvailableInterpreterInfo> list() { + for (AvailableInterpreterInfo info : availableInterpreters) { + System.out.println(info.name + "\t\t\t" + info.description); + } + + return availableInterpreters; + } + + public void installAll() { + for (AvailableInterpreterInfo info : availableInterpreters) { + install(info.name, info.artifact); + } + } + + public void install(String [] names) { + for (String name : names) { + install(name); + } + } + + public void install(String name) { + // find artifact name + for (AvailableInterpreterInfo info : availableInterpreters) { + if (name.equals(info.name)) { + install(name, info.artifact); + return; + } + } + + throw new RuntimeException("Can't find interpreter '" + name + "'"); + } + + public void install(String [] names, String [] artifacts) { + if (names.length != artifacts.length) { + throw new RuntimeException("Length of given names and artifacts are different"); + } + + for (int i = 0; i < names.length; i++) { + install(names[i], artifacts[i]); + } + } + + public void install(String name, String artifact) { + DependencyResolver depResolver = new DependencyResolver(localRepoDir); + if (proxyUrl != null) { + depResolver.setProxy(proxyUrl, proxyUser, proxyPassword); + } + + File installDir = new File(interpreterBaseDir, name); + if (installDir.exists()) { + System.err.println("Directory " + installDir.getAbsolutePath() + + " already exists" + + "\n\nSkipped"); + return; + } + + System.out.println("Install " + name + "(" + artifact + ") to " + + installDir.getAbsolutePath() + " ... "); + + try { + depResolver.load(artifact, installDir); + System.out.println("Interpreter " + name + " installed under " + + installDir.getAbsolutePath() + "."); + startTip(); + } catch (RepositoryException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void setProxy(URL proxyUrl, String proxyUser, String proxyPassword) { + this.proxyUrl = proxyUrl; + this.proxyUser = proxyUser; + this.proxyPassword = proxyPassword; + } + + public static void usage() { + System.out.println("Options"); + System.out.println(" -l, --list List available interpreters"); + System.out.println(" -a, --all Install all available interpreters"); + System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " + + "list)" + + "e.g. md,shell,jdbc,python,angular"); + System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" + + ". " + + "(comma separated list correspond to --name) " + + "e.g. customGroup:customArtifact:customVersion"); + System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port"); + System.out.println(" --proxy-user [user] (Optional) proxy user"); + System.out.println(" --proxy-password [password] (Optional) proxy password"); + } + + public static void main(String [] args) throws IOException { + if (args.length == 0) { + usage(); + return; + } + + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + InstallInterpreter installer = new InstallInterpreter( + new File(conf.getInterpreterListPath()), + new File(conf.getInterpreterDir()), + conf.getInterpreterLocalRepoPath()); + + String names = null; + String artifacts = null; + URL proxyUrl = null; + String proxyUser = null; + String proxyPassword = null; + boolean all = false; + + for (int i = 0; i < args.length; i++) { + String arg = args[i].toLowerCase(Locale.US); + switch (arg) { + case "--list": + case "-l": + installer.list(); + System.exit(0); + break; + case "--all": + case "-a": + all = true; + break; + case "--name": + case "-n": + names = args[++i]; + break; + case "--artifact": + case "-t": + artifacts = args[++i]; + break; + case "--version": + case "-v": + Util.getVersion(); + break; + case "--proxy-url": + proxyUrl = new URL(args[++i]); + break; + case "--proxy-user": + proxyUser = args[++i]; + break; + case "--proxy-password": + proxyPassword = args[++i]; + break; + case "--help": + case "-h": + usage(); + System.exit(0); + break; + default: + System.out.println("Unknown option " + arg); + } + } + + if (proxyUrl != null) { + installer.setProxy(proxyUrl, proxyUser, proxyPassword); + } + + if (all) { + installer.installAll(); + System.exit(0); + } + + if (names != null) { + if (artifacts != null) { + installer.install(names.split(","), artifacts.split(",")); + } else { + installer.install(names.split(",")); + } + } + } + + private static void startTip() { + System.out.println("\n1. Restart Zeppelin" + + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" + + "\n3. Then you can bind the interpreter on your note"); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java new file mode 100644 index 0000000..0ac7116 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +/** + * Proxy for AngularObjectRegistry that exists in remote interpreter process + */ +public class RemoteAngularObjectRegistry extends AngularObjectRegistry { + Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); + private InterpreterGroup interpreterGroup; + + public RemoteAngularObjectRegistry(String interpreterId, + AngularObjectRegistryListener listener, + InterpreterGroup interpreterGroup) { + super(interpreterId, listener); + this.interpreterGroup = interpreterGroup; + } + + private RemoteInterpreterProcess getRemoteInterpreterProcess() { + return interpreterGroup.getRemoteInterpreterProcess(); + } + + /** + * When ZeppelinServer side code want to add angularObject to the registry, + * this method should be used instead of add() + * @param name + * @param o + * @param noteId + * @return + */ + public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String + paragraphId) { + Gson gson = new Gson(); + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (!remoteInterpreterProcess.isRunning()) { + return super.add(name, o, noteId, paragraphId, true); + } + + Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); + return super.add(name, o, noteId, paragraphId, true); + } catch (TException e) { + broken = true; + logger.error("Error", e); + } catch (Exception e) { + logger.error("Error", e); + } finally { + if (client != null) { + remoteInterpreterProcess.releaseClient(client, broken); + } + } + return null; + } + + /** + * When ZeppelinServer side code want to remove angularObject from the registry, + * this method should be used instead of remove() + * @param name + * @param noteId + * @param paragraphId + * @return + */ + public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String + paragraphId) { + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { + return super.remove(name, noteId, paragraphId); + } + + Client client = null; + boolean broken = false; + try { + client = remoteInterpreterProcess.getClient(); + client.angularObjectRemove(name, noteId, paragraphId); + return super.remove(name, noteId, paragraphId); + } catch (TException e) { + broken = true; + logger.error("Error", e); + } catch (Exception e) { + logger.error("Error", e); + } finally { + if (client != null) { + remoteInterpreterProcess.releaseClient(client, broken); + } + } + return null; + } + + public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { + List<AngularObject> all = getAll(noteId, paragraphId); + for (AngularObject ao : all) { + removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId); + } + } + + @Override + protected AngularObject createNewAngularObject(String name, Object o, String noteId, String + paragraphId) { + return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, + getAngularObjectListener()); + } +}