Repository: zeppelin Updated Branches: refs/heads/master 215599cb3 -> ae1cb0527
ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting ### What is this PR for? This PR would only restart the trigger user's interpreter rather than all the interpreter. So that restarting won't affect other users. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1770 ### How should this be tested? Tested manually. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #1846 from zjffdu/ZEPPELIN-1770 and squashes the following commits: 5ee076d [Jeff Zhang] fix scoped mode and add unit test 8cb28a3 [Jeff Zhang] ZEPPELIN-1770. Restart only the client user's interpreter when restarting interpreter setting Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ae1cb052 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ae1cb052 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ae1cb052 Branch: refs/heads/master Commit: ae1cb0527bc223b25761e1370618929e228183f8 Parents: 215599c Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Jan 10 14:07:33 2017 +0800 Committer: Jongyoul Lee <jongy...@apache.org> Committed: Tue Jan 17 02:26:39 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/rest/InterpreterRestApi.java | 4 +- .../interpreter/InterpreterFactory.java | 26 +++++-- .../interpreter/InterpreterSetting.java | 44 ++++++++++- .../interpreter/InterpreterFactoryTest.java | 78 ++++++++++++++++++-- 4 files changed, 136 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index 90a58ac..06d4752 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.Status; import com.google.gson.Gson; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.zeppelin.rest.message.RestartInterpreterRequest; +import org.apache.zeppelin.utils.SecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.repository.RemoteRepository; @@ -178,12 +179,11 @@ public class InterpreterRestApi { @ZeppelinApi public Response restartSetting(String message, @PathParam("settingId") String settingId) { logger.info("Restart interpreterSetting {}, msg={}", settingId, message); - try { RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class); String noteId = request == null ? null : request.getNoteId(); - interpreterFactory.restart(settingId, noteId); + interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal()); } catch (InterpreterException e) { logger.error("Exception in InterpreterRestApi while restartSetting ", e); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 8a89170..e8b6868 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -737,7 +737,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { String noteId) { InterpreterOption option = interpreterSetting.getOption(); if (option.isProcess()) { - interpreterSetting.closeAndRemoveInterpreterGroup(noteId); + interpreterSetting.closeAndRemoveInterpreterGroupByNoteId(noteId); } else if (option.isSession()) { InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId); String key = getInterpreterSessionKey(user, noteId, interpreterSetting); @@ -971,18 +971,23 @@ public class InterpreterFactory implements InterpreterGroupFactory { return noteId == null ? false : true; } - public void restart(String settingId, String noteId) { + public void restart(String settingId, String noteId, String user) { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); + // restart interpreter setting in note page if (noteIdIsExist(noteId) && intpSetting.getOption().isProcess()) { - intpSetting.closeAndRemoveInterpreterGroup(noteId); + intpSetting.closeAndRemoveInterpreterGroupByNoteId(noteId); return; + } else { + // restart interpreter setting in interpreter setting page + restart(settingId, user); } - restart(settingId); + + } - public void restart(String id) { + public void restart(String id, String user) { synchronized (interpreterSettings) { InterpreterSetting intpSetting = interpreterSettings.get(id); // Check if dependency in specified path is changed @@ -993,8 +998,11 @@ public class InterpreterFactory implements InterpreterGroupFactory { copyDependenciesFromLocalPath(intpSetting); stopJobAllInterpreter(intpSetting); - - intpSetting.closeAndRemoveAllInterpreterGroups(); + if (user.equals("anonymous")) { + intpSetting.closeAndRemoveAllInterpreterGroups(); + } else { + intpSetting.closeAndRemoveInterpreterGroupByUser(user); + } } else { throw new InterpreterException("Interpreter setting id " + id + " not found"); @@ -1002,6 +1010,10 @@ public class InterpreterFactory implements InterpreterGroupFactory { } } + public void restart(String id) { + restart(id, "anonymous"); + } + private void stopJobAllInterpreter(InterpreterSetting intpSetting) { if (intpSetting != null) { for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 828938c..9176ddf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -144,6 +144,26 @@ public class InterpreterSetting { return key; } + private String getInterpreterSessionKey(String user, String noteId) { + InterpreterOption option = getOption(); + String key; + if (option.isExistingProcess()) { + key = Constants.EXISTING_PROCESS; + } else if (option.perNoteScoped() && option.perUserScoped()) { + key = user + ":" + noteId; + } else if (option.perUserScoped()) { + key = user; + } else if (option.perNoteScoped()) { + key = noteId; + } else { + key = "shared_session"; + } + + logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + + "{}", key, noteId, user, getName()); + return key; + } + public InterpreterGroup getInterpreterGroup(String user, String noteId) { String key = getInterpreterProcessKey(user, noteId); if (!interpreterGroupRef.containsKey(key)) { @@ -173,7 +193,7 @@ public class InterpreterSetting { } } - void closeAndRemoveInterpreterGroup(String noteId) { + void closeAndRemoveInterpreterGroupByNoteId(String noteId) { String key = getInterpreterProcessKey("", noteId); InterpreterGroup groupToRemove = null; @@ -190,10 +210,30 @@ public class InterpreterSetting { } } + void closeAndRemoveInterpreterGroupByUser(String user) { + if (user.equals("anonymous")) { + user = ""; + } + String processKey = getInterpreterProcessKey(user, ""); + String sessionKey = getInterpreterSessionKey(user, ""); + InterpreterGroup groupToRemove = null; + for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) { + if (intpKey.contains(processKey)) { + interpreterGroupWriteLock.lock(); + groupToRemove = interpreterGroupRef.remove(intpKey); + interpreterGroupWriteLock.unlock(); + } + } + + if (groupToRemove != null) { + groupToRemove.close(sessionKey); + } + } + void closeAndRemoveAllInterpreterGroups() { HashSet<String> groupsToRemove = new HashSet<>(interpreterGroupRef.keySet()); for (String key : groupsToRemove) { - closeAndRemoveInterpreterGroup(key); + closeAndRemoveInterpreterGroupByNoteId(key); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ae1cb052/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java index 661459b..7522366 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java @@ -167,6 +167,79 @@ public class InterpreterFactoryTest { assertEquals("value_2", remoteInterpreter.getProperty("property_2")); } + /** + * 2 users' interpreters in scoped mode. Each user has one session. Restarting user1's interpreter + * won't affect user2's interpreter + * @throws Exception + */ + @Test + public void testRestartInterpreterInScopedMode() throws Exception { + factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver, false); + List<InterpreterSetting> all = factory.get(); + InterpreterSetting mock1Setting = null; + for (InterpreterSetting setting : all) { + if (setting.getName().equals("mock1")) { + mock1Setting = setting; + break; + } + } + mock1Setting.getOption().setPerUser("scoped"); + mock1Setting.getOption().setPerNote("shared"); + // set remote as false so that we won't create new remote interpreter process + mock1Setting.getOption().setRemote(false); + mock1Setting.getOption().setHost("localhost"); + mock1Setting.getOption().setPort(2222); + InterpreterGroup interpreterGroup = mock1Setting.getInterpreterGroup("user1", "sharedProcess"); + factory.createInterpretersForNote(mock1Setting, "user1", "sharedProcess", "user1"); + factory.createInterpretersForNote(mock1Setting, "user2", "sharedProcess", "user2"); + + LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup.get("user1").get(0); + interpreter1.open(); + LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup.get("user2").get(0); + interpreter2.open(); + + mock1Setting.closeAndRemoveInterpreterGroupByUser("user1"); + assertFalse(interpreter1.isOpen()); + assertTrue(interpreter2.isOpen()); + } + + /** + * 2 users' interpreters in isolated mode. Each user has one interpreterGroup. Restarting user1's interpreter + * won't affect user2's interpreter + * @throws Exception + */ + @Test + public void testRestartInterpreterInIsolatedMode() throws Exception { + factory = new InterpreterFactory(conf, new InterpreterOption(true), null, null, null, depResolver, false); + List<InterpreterSetting> all = factory.get(); + InterpreterSetting mock1Setting = null; + for (InterpreterSetting setting : all) { + if (setting.getName().equals("mock1")) { + mock1Setting = setting; + break; + } + } + mock1Setting.getOption().setPerUser("isolated"); + mock1Setting.getOption().setPerNote("shared"); + // set remote as false so that we won't create new remote interpreter process + mock1Setting.getOption().setRemote(false); + mock1Setting.getOption().setHost("localhost"); + mock1Setting.getOption().setPort(2222); + InterpreterGroup interpreterGroup1 = mock1Setting.getInterpreterGroup("user1", "note1"); + InterpreterGroup interpreterGroup2 = mock1Setting.getInterpreterGroup("user2", "note2"); + factory.createInterpretersForNote(mock1Setting, "user1", "note1", "shared_session"); + factory.createInterpretersForNote(mock1Setting, "user2", "note2", "shared_session"); + + LazyOpenInterpreter interpreter1 = (LazyOpenInterpreter)interpreterGroup1.get("shared_session").get(0); + interpreter1.open(); + LazyOpenInterpreter interpreter2 = (LazyOpenInterpreter)interpreterGroup2.get("shared_session").get(0); + interpreter2.open(); + + mock1Setting.closeAndRemoveInterpreterGroupByUser("user1"); + assertFalse(interpreter1.isOpen()); + assertTrue(interpreter2.isOpen()); + } + @Test public void testFactoryDefaultList() throws IOException, RepositoryException { // get default settings @@ -365,9 +438,4 @@ public class InterpreterFactoryTest { interpreterRunner = ((RemoteInterpreter) ((LazyOpenInterpreter) i).getInnerInterpreter()).getInterpreterRunner(); assertEquals(interpreterRunner, testInterpreterRunner); } - - @Test - public void interpreterRunnerAsAbsolutePathTest() { - - } }