Repository: zeppelin Updated Branches: refs/heads/master 8d4902e71 -> 2a3791020
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 603be2e..e1a20b5 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -17,27 +17,31 @@ package org.apache.zeppelin.notebook; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.interpreter.AbstractInterpreterTest; -import org.apache.zeppelin.interpreter.ClassloaderInterpreter; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterFactory; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResultMessage; -import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.resource.ResourcePoolUtils; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -52,35 +56,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sonatype.aether.RepositoryException; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -public class NotebookTest extends AbstractInterpreterTest implements JobListenerFactory { +public class NotebookTest implements JobListenerFactory{ private static final Logger logger = LoggerFactory.getLogger(NotebookTest.class); + private File tmpDir; + private ZeppelinConfiguration conf; private SchedulerFactory schedulerFactory; + private File notebookDir; private Notebook notebook; private NotebookRepo notebookRepo; + private InterpreterFactory factory; + private InterpreterSettingManager interpreterSettingManager; + private DependencyResolver depResolver; private NotebookAuthorization notebookAuthorization; private Credentials credentials; private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS; @@ -88,30 +75,57 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener @Before public void setUp() throws Exception { - System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC.getVarName(), "true"); - System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2"); - super.setUp(); - schedulerFactory = SchedulerFactory.singleton(); + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + new File(tmpDir, "conf").mkdirs(); + notebookDir = new File(tmpDir + "/notebook"); + notebookDir.mkdirs(); + + System.setProperty(ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.toString() + "/conf"); + System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath()); + System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + + conf = ZeppelinConfiguration.create(); + + this.schedulerFactory = new SchedulerFactory(); + + depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(false)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); + + ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>(); + interpreterInfos2.add(new InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock2", interpreterInfos2, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null); + interpreterSettingManager.createNewSetting("mock2", "mock2", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); + SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); notebookAuthorization = NotebookAuthorization.init(conf); credentials = new Credentials(conf.credentialsPersist(), conf.getCredentialsPath()); - notebook = new Notebook(conf, notebookRepo, schedulerFactory, interpreterFactory, interpreterSettingManager, this, search, + notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, this, search, notebookAuthorization, credentials); + System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC.getVarName(), "true"); } @After public void tearDown() throws Exception { - super.tearDown(); + delete(tmpDir); } @Test public void testSelectingReplImplementation() throws IOException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); // run with default repl Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -245,7 +259,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener Notebook notebook2 = new Notebook( conf, notebookRepo, schedulerFactory, - new InterpreterFactory(interpreterSettingManager), + new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager), interpreterSettingManager, null, null, null, null); assertEquals(1, notebook2.getAllNotes().size()); @@ -302,7 +316,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener @Test public void testRunAll() throws IOException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); // p1 Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -341,7 +355,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testSchedule() throws InterruptedException, IOException { // create a note and a paragraph Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); Map config = new HashMap<>(); @@ -414,8 +428,8 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testAutoRestartInterpreterAfterSchedule() throws InterruptedException, IOException{ // create a note and a paragraph Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); - + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); Map config = new HashMap<>(); p.setConfig(config); @@ -435,11 +449,11 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter) - ((LazyOpenInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), "mock1")).getInnerInterpreter()) + ((LazyOpenInterpreter) factory.getInterpreter(anonymous.getUser(), note.getId(), "mock1")).getInnerInterpreter()) .getInnerInterpreter())); MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter) - ((LazyOpenInterpreter) interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), "mock2")).getInnerInterpreter()) + ((LazyOpenInterpreter) factory.getInterpreter(anonymous.getUser(), note.getId(), "mock2")).getInnerInterpreter()) .getInnerInterpreter())); // wait until interpreters are started @@ -453,9 +467,9 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener } // remove cron scheduler. -// config.put("cron", null); -// note.setConfig(config); -// notebook.refreshCron(note.getId()); + config.put("cron", null); + note.setConfig(config); + notebook.refreshCron(note.getId()); // make sure all paragraph has been executed assertNotNull(p.getDateFinished()); @@ -467,7 +481,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testExportAndImportNote() throws IOException, CloneNotSupportedException, InterruptedException, InterpreterException, SchedulerException, RepositoryException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); String simpleText = "hello world"; @@ -506,7 +520,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testCloneNote() throws IOException, CloneNotSupportedException, InterruptedException, InterpreterException, SchedulerException, RepositoryException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p.setText("hello world"); @@ -540,7 +554,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testCloneNoteWithNoName() throws IOException, CloneNotSupportedException, InterruptedException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); Note cloneNote = notebook.cloneNote(note.getId(), null, anonymous); assertEquals(cloneNote.getName(), "Note " + cloneNote.getId()); @@ -552,7 +566,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testCloneNoteWithExceptionResult() throws IOException, CloneNotSupportedException, InterruptedException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p.setText("hello world"); @@ -577,28 +591,28 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener @Test public void testResourceRemovealOnParagraphNoteRemove() throws IOException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); - + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); + for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) { + intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId())); + } Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("hello"); Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p2.setText("%mock2 world"); - for (InterpreterGroup intpGroup : interpreterSettingManager.getAllInterpreterGroup()) { - intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId())); - } + note.runAll(); while (p1.isTerminated() == false || p1.getResult() == null) Thread.yield(); while (p2.isTerminated() == false || p2.getResult() == null) Thread.yield(); - assertEquals(2, interpreterSettingManager.getAllResources().size()); + assertEquals(2, ResourcePoolUtils.getAllResources().size()); // remove a paragraph note.removeParagraph(anonymous.getUser(), p1.getId()); - assertEquals(1, interpreterSettingManager.getAllResources().size()); + assertEquals(1, ResourcePoolUtils.getAllResources().size()); // remove note notebook.removeNote(note.getId(), anonymous); - assertEquals(0, interpreterSettingManager.getAllResources().size()); + assertEquals(0, ResourcePoolUtils.getAllResources().size()); } @Test @@ -606,10 +620,10 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener IOException { // create a note and a paragraph Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); AngularObjectRegistry registry = interpreterSettingManager - .getInterpreterSettings(note.getId()).get(0).getOrCreateInterpreterGroup(anonymous.getUser(), "sharedProcess") + .getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(), "sharedProcess") .getAngularObjectRegistry(); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -639,10 +653,10 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener IOException { // create a note and a paragraph Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); AngularObjectRegistry registry = interpreterSettingManager - .getInterpreterSettings(note.getId()).get(0).getOrCreateInterpreterGroup(anonymous.getUser(), "sharedProcess") + .getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(), "sharedProcess") .getAngularObjectRegistry(); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -673,10 +687,10 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener IOException { // create a note and a paragraph Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); AngularObjectRegistry registry = interpreterSettingManager - .getInterpreterSettings(note.getId()).get(0).getOrCreateInterpreterGroup(anonymous.getUser(), "sharedProcess") + .getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(), "sharedProcess") .getAngularObjectRegistry(); // add local scope object @@ -686,13 +700,14 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener // restart interpreter interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getId()); - registry = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0) - .getOrCreateInterpreterGroup(anonymous.getUser(), "sharedProcess") - .getAngularObjectRegistry(); - - // New InterpreterGroup will be created and its AngularObjectRegistry will be created - assertNull(registry.get("o1", note.getId(), null)); - assertNull(registry.get("o2", null, null)); + registry = interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(), "sharedProcess") + .getAngularObjectRegistry(); + + // local and global scope object should be removed + // But InterpreterGroup does not implement angularObjectRegistry per session (scoped, isolated) + // So for now, does not have good way to remove all objects in particular session on restart. + assertNotNull(registry.get("o1", note.getId(), null)); + assertNotNull(registry.get("o2", null, null)); notebook.removeNote(note.getId(), anonymous); } @@ -787,7 +802,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException, IOException { Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters(anonymous.getUser(), note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); // create three paragraphs Paragraph p1 = note.addNewParagraph(anonymous); @@ -811,11 +826,11 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getId()); // make sure three differnt status aborted well. -// assertEquals(Status.FINISHED, p1.getStatus()); -// assertEquals(Status.ABORT, p2.getStatus()); -// assertEquals(Status.ABORT, p3.getStatus()); -// -// notebook.removeNote(note.getId(), anonymous); + assertEquals(Status.FINISHED, p1.getStatus()); + assertEquals(Status.ABORT, p2.getStatus()); + assertEquals(Status.ABORT, p3.getStatus()); + + notebook.removeNote(note.getId(), anonymous); } @Test http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java index 7c85778..7bd6819 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java @@ -21,7 +21,6 @@ package org.apache.zeppelin.notebook; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -40,7 +39,7 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectBuilder; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.Input; -import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -48,7 +47,10 @@ import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSetting.Status; +import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; @@ -56,7 +58,6 @@ import org.junit.Test; import java.util.HashMap; import java.util.Map; - import org.mockito.Mockito; public class ParagraphTest { @@ -185,7 +186,7 @@ public class ParagraphTest { when(mockInterpreterOption.permissionIsSet()).thenReturn(false); when(mockInterpreterSetting.getStatus()).thenReturn(Status.READY); when(mockInterpreterSetting.getId()).thenReturn("mock_id_1"); - when(mockInterpreterSetting.getOrCreateInterpreterGroup(anyString(), anyString())).thenReturn(mockInterpreterGroup); + when(mockInterpreterSetting.getInterpreterGroup(anyString(), anyString())).thenReturn(mockInterpreterGroup); spyInterpreterSettingList.add(mockInterpreterSetting); when(mockNote.getId()).thenReturn("any_id"); when(mockInterpreterSettingManager.getInterpreterSettings(anyString())).thenReturn(spyInterpreterSettingList); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 14c8789..803912e 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -33,13 +33,10 @@ import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.notebook.*; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; @@ -72,7 +69,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory { private Credentials credentials; private AuthenticationInfo anonymous; private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSyncTest.class); - + @Before public void setUp() throws Exception { String zpath = System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis(); @@ -94,13 +91,12 @@ public class NotebookRepoSyncTest implements JobListenerFactory { LOG.info("secondary note dir : " + secNotePath); conf = ZeppelinConfiguration.create(); - this.schedulerFactory = SchedulerFactory.singleton(); + this.schedulerFactory = new SchedulerFactory(); depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + "/local-repo"); - interpreterSettingManager = new InterpreterSettingManager(conf, - mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); - factory = new InterpreterFactory(interpreterSettingManager); - + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); + search = mock(SearchService.class); notebookRepoSync = new NotebookRepoSync(conf); notebookAuthorization = NotebookAuthorization.init(conf); @@ -114,19 +110,19 @@ public class NotebookRepoSyncTest implements JobListenerFactory { public void tearDown() throws Exception { delete(mainZepDir); } - + @Test public void testRepoCount() throws IOException { assertTrue(notebookRepoSync.getMaxRepoNum() >= notebookRepoSync.getRepoCount()); } - + @Test public void testSyncOnCreate() throws IOException { /* check that both storage systems are empty */ assertTrue(notebookRepoSync.getRepoCount() > 1); assertEquals(0, notebookRepoSync.list(0, anonymous).size()); assertEquals(0, notebookRepoSync.list(1, anonymous).size()); - + /* create note */ Note note = notebookSync.createNote(anonymous); @@ -134,7 +130,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory { assertEquals(1, notebookRepoSync.list(0, anonymous).size()); assertEquals(1, notebookRepoSync.list(1, anonymous).size()); assertEquals(notebookRepoSync.list(0, anonymous).get(0).getId(),notebookRepoSync.list(1, anonymous).get(0).getId()); - + notebookSync.removeNote(notebookRepoSync.list(0, null).get(0).getId(), anonymous); } @@ -144,26 +140,26 @@ public class NotebookRepoSyncTest implements JobListenerFactory { assertTrue(notebookRepoSync.getRepoCount() > 1); assertEquals(0, notebookRepoSync.list(0, anonymous).size()); assertEquals(0, notebookRepoSync.list(1, anonymous).size()); - + Note note = notebookSync.createNote(anonymous); /* check that created in both storage systems */ assertEquals(1, notebookRepoSync.list(0, anonymous).size()); assertEquals(1, notebookRepoSync.list(1, anonymous).size()); assertEquals(notebookRepoSync.list(0, anonymous).get(0).getId(),notebookRepoSync.list(1, anonymous).get(0).getId()); - + /* remove Note */ notebookSync.removeNote(notebookRepoSync.list(0, anonymous).get(0).getId(), anonymous); - + /* check that deleted in both storages */ assertEquals(0, notebookRepoSync.list(0, anonymous).size()); assertEquals(0, notebookRepoSync.list(1, anonymous).size()); - + } - + @Test public void testSyncUpdateMain() throws IOException { - + /* create note */ Note note = notebookSync.createNote(anonymous); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -171,19 +167,19 @@ public class NotebookRepoSyncTest implements JobListenerFactory { config.put("enabled", true); p1.setConfig(config); p1.setText("hello world"); - + /* new paragraph exists in note instance */ assertEquals(1, note.getParagraphs().size()); - + /* new paragraph not yet saved into storages */ assertEquals(0, notebookRepoSync.get(0, notebookRepoSync.list(0, anonymous).get(0).getId(), anonymous).getParagraphs().size()); assertEquals(0, notebookRepoSync.get(1, notebookRepoSync.list(1, anonymous).get(0).getId(), anonymous).getParagraphs().size()); - - /* save to storage under index 0 (first storage) */ + + /* save to storage under index 0 (first storage) */ notebookRepoSync.save(0, note, anonymous); - + /* check paragraph saved to first storage */ assertEquals(1, notebookRepoSync.get(0, notebookRepoSync.list(0, anonymous).get(0).getId(), anonymous).getParagraphs().size()); @@ -288,45 +284,45 @@ public class NotebookRepoSyncTest implements JobListenerFactory { // one git versioned storage initialized assertThat(vRepoSync.getRepoCount()).isEqualTo(1); assertThat(vRepoSync.getRepo(0)).isInstanceOf(GitNotebookRepo.class); - + GitNotebookRepo gitRepo = (GitNotebookRepo) vRepoSync.getRepo(0); - + // no notes assertThat(vRepoSync.list(anonymous).size()).isEqualTo(0); // create note Note note = vNotebookSync.createNote(anonymous); assertThat(vRepoSync.list(anonymous).size()).isEqualTo(1); - + String noteId = vRepoSync.list(anonymous).get(0).getId(); // first checkpoint vRepoSync.checkpoint(noteId, "checkpoint message", anonymous); int vCount = gitRepo.revisionHistory(noteId, anonymous).size(); assertThat(vCount).isEqualTo(1); - + Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); Map<String, Object> config = p.getConfig(); config.put("enabled", true); p.setConfig(config); p.setText("%md checkpoint test"); - + // save and checkpoint again vRepoSync.save(note, anonymous); vRepoSync.checkpoint(noteId, "checkpoint message 2", anonymous); assertThat(gitRepo.revisionHistory(noteId, anonymous).size()).isEqualTo(vCount + 1); notebookRepoSync.remove(note.getId(), anonymous); } - + @Test public void testSyncWithAcl() throws IOException { /* scenario 1 - note exists with acl on main storage */ AuthenticationInfo user1 = new AuthenticationInfo("user1"); Note note = notebookSync.createNote(user1); assertEquals(0, note.getParagraphs().size()); - + // saved on both storages assertEquals(1, notebookRepoSync.list(0, null).size()); assertEquals(1, notebookRepoSync.list(1, null).size()); - + /* check that user1 is the only owner */ NotebookAuthorization authInfo = NotebookAuthorization.getInstance(); Set<String> entity = new HashSet<String>(); @@ -335,23 +331,23 @@ public class NotebookRepoSyncTest implements JobListenerFactory { assertEquals(1, authInfo.getOwners(note.getId()).size()); assertEquals(0, authInfo.getReaders(note.getId()).size()); assertEquals(0, authInfo.getWriters(note.getId()).size()); - + /* update note and save on secondary storage */ Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); p1.setText("hello world"); assertEquals(1, note.getParagraphs().size()); notebookRepoSync.save(1, note, null); - + /* check paragraph isn't saved into first storage */ assertEquals(0, notebookRepoSync.get(0, notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size()); /* check paragraph is saved into second storage */ assertEquals(1, notebookRepoSync.get(1, notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size()); - + /* now sync by user1 */ notebookRepoSync.sync(user1); - + /* check that note updated and acl are same on main storage*/ assertEquals(1, notebookRepoSync.get(0, notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size()); @@ -359,7 +355,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory { assertEquals(1, authInfo.getOwners(note.getId()).size()); assertEquals(0, authInfo.getReaders(note.getId()).size()); assertEquals(0, authInfo.getWriters(note.getId()).size()); - + /* scenario 2 - note doesn't exist on main storage */ /* remove from main storage */ notebookRepoSync.remove(0, note.getId(), user1); @@ -369,7 +365,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory { assertEquals(0, authInfo.getOwners(note.getId()).size()); assertEquals(0, authInfo.getReaders(note.getId()).size()); assertEquals(0, authInfo.getWriters(note.getId()).size()); - + /* now sync - should bring note from secondary storage with added acl */ notebookRepoSync.sync(user1); assertEquals(1, notebookRepoSync.list(0, null).size()); @@ -427,5 +423,5 @@ public class NotebookRepoSyncTest implements JobListenerFactory { } }; } - + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java index b393589..6f85bf6 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java @@ -22,15 +22,18 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; - import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.interpreter.AbstractInterpreterTest; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterInfo; import org.apache.zeppelin.interpreter.InterpreterOption; @@ -38,7 +41,6 @@ import org.apache.zeppelin.interpreter.DefaultInterpreterProperty; import org.apache.zeppelin.interpreter.InterpreterProperty; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; - import org.apache.zeppelin.notebook.JobListenerFactory; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; @@ -56,34 +58,59 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; -public class VFSNotebookRepoTest extends AbstractInterpreterTest implements JobListenerFactory { - +public class VFSNotebookRepoTest implements JobListenerFactory { private static final Logger LOG = LoggerFactory.getLogger(VFSNotebookRepoTest.class); - + private ZeppelinConfiguration conf; private SchedulerFactory schedulerFactory; private Notebook notebook; private NotebookRepo notebookRepo; + private InterpreterSettingManager interpreterSettingManager; + private InterpreterFactory factory; + private DependencyResolver depResolver; private NotebookAuthorization notebookAuthorization; + private File mainZepDir; + private File mainNotebookDir; + @Before public void setUp() throws Exception { - System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "mock1,mock2"); + String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis(); + mainZepDir = new File(zpath); + mainZepDir.mkdirs(); + new File(mainZepDir, "conf").mkdirs(); + String mainNotePath = zpath + "/notebook"; + mainNotebookDir = new File(mainNotePath); + mainNotebookDir.mkdirs(); + + System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), mainZepDir.getAbsolutePath()); + System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath()); System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo"); + conf = ZeppelinConfiguration.create(); + + this.schedulerFactory = new SchedulerFactory(); + + this.schedulerFactory = new SchedulerFactory(); + depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + "/local-repo"); + interpreterSettingManager = new InterpreterSettingManager(conf, depResolver, new InterpreterOption(true)); + factory = new InterpreterFactory(conf, null, null, null, depResolver, false, interpreterSettingManager); - super.setUp(); + ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>(); + interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), "mock1", true, new HashMap<String, Object>())); + interpreterSettingManager.add("mock1", interpreterInfos, new ArrayList<Dependency>(), new InterpreterOption(), + Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null); + interpreterSettingManager.createNewSetting("mock1", "mock1", new ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, InterpreterProperty>()); - this.schedulerFactory = SchedulerFactory.singleton(); SearchService search = mock(SearchService.class); notebookRepo = new VFSNotebookRepo(conf); notebookAuthorization = NotebookAuthorization.init(conf); - notebook = new Notebook(conf, notebookRepo, schedulerFactory, interpreterFactory, interpreterSettingManager, this, search, + notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, interpreterSettingManager, this, search, notebookAuthorization, null); } @After public void tearDown() throws Exception { - if (!FileUtils.deleteQuietly(testRootDir)) { - LOG.error("Failed to delete {} ", testRootDir.getName()); + if (!FileUtils.deleteQuietly(mainZepDir)) { + LOG.error("Failed to delete {} ", mainZepDir.getName()); } } @@ -93,7 +120,7 @@ public class VFSNotebookRepoTest extends AbstractInterpreterTest implements JobL int numNotes = notebookRepo.list(null).size(); // when create invalid json file - File testNoteDir = new File(notebookDir, "test"); + File testNoteDir = new File(mainNotebookDir, "test"); testNoteDir.mkdir(); FileUtils.writeStringToFile(new File(testNoteDir, "note.json"), ""); @@ -105,7 +132,7 @@ public class VFSNotebookRepoTest extends AbstractInterpreterTest implements JobL public void testSaveNotebook() throws IOException, InterruptedException { AuthenticationInfo anonymous = new AuthenticationInfo("anonymous"); Note note = notebook.createNote(anonymous); - interpreterSettingManager.setInterpreterBinding("user", note.getId(), interpreterSettingManager.getInterpreterSettingIds()); + interpreterSettingManager.setInterpreters("user", note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList()); Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); Map<String, Object> config = p1.getConfig(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java index 7223109..46134e5 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java @@ -17,34 +17,35 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; -import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; -import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; -import java.util.List; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; /** * Unittest for DistributedResourcePool */ -public class DistributedResourcePoolTest extends AbstractInterpreterTest { - +public class DistributedResourcePoolTest { + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + private InterpreterGroup intpGroup1; + private InterpreterGroup intpGroup2; + private HashMap<String, String> env; private RemoteInterpreter intp1; private RemoteInterpreter intp2; private InterpreterContext context; @@ -54,10 +55,50 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest { @Before public void setUp() throws Exception { - super.setUp(); - InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("mock_resource_pool"); - intp1 = (RemoteInterpreter) interpreterSetting.getInterpreter("user1", "note1", "mock_resource_pool"); - intp2 = (RemoteInterpreter) interpreterSetting.getInterpreter("user2", "note1", "mock_resource_pool"); + env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + Properties p = new Properties(); + + intp1 = new RemoteInterpreter( + p, + "note", + MockInterpreterResourcePool.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false + ); + + intpGroup1 = new InterpreterGroup("intpGroup1"); + intpGroup1.put("note", new LinkedList<Interpreter>()); + intpGroup1.get("note").add(intp1); + intp1.setInterpreterGroup(intpGroup1); + + intp2 = new RemoteInterpreter( + p, + "note", + MockInterpreterResourcePool.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + null, + null, + "anonymous", + false + ); + + intpGroup2 = new InterpreterGroup("intpGroup2"); + intpGroup2.put("note", new LinkedList<Interpreter>()); + intpGroup2.get("note").add(intp2); + intp2.setInterpreterGroup(intpGroup2); context = new InterpreterContext( "note", @@ -76,13 +117,26 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest { intp1.open(); intp2.open(); - eventPoller1 = intp1.getInterpreterGroup().getRemoteInterpreterProcess().getRemoteInterpreterEventPoller(); - eventPoller2 = intp1.getInterpreterGroup().getRemoteInterpreterProcess().getRemoteInterpreterEventPoller(); + eventPoller1 = new RemoteInterpreterEventPoller(null, null); + eventPoller1.setInterpreterGroup(intpGroup1); + eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess()); + + eventPoller2 = new RemoteInterpreterEventPoller(null, null); + eventPoller2.setInterpreterGroup(intpGroup2); + eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess()); + + eventPoller1.start(); + eventPoller2.start(); } @After public void tearDown() throws Exception { - interpreterSettingManager.close(); + eventPoller1.shutdown(); + intp1.close(); + intpGroup1.close(); + eventPoller2.shutdown(); + intp2.close(); + intpGroup2.close(); } @Test @@ -181,13 +235,13 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest { // then get all resources. - assertEquals(4, interpreterSettingManager.getAllResources().size()); + assertEquals(4, ResourcePoolUtils.getAllResources().size()); // when remove all resources from note1 - interpreterSettingManager.removeResourcesBelongsToNote("note1"); + ResourcePoolUtils.removeResourcesBelongsToNote("note1"); // then resources should be removed. - assertEquals(2, interpreterSettingManager.getAllResources().size()); + assertEquals(2, ResourcePoolUtils.getAllResources().size()); assertEquals("", gson.fromJson( intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(), String.class)); @@ -197,10 +251,10 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest { // when remove all resources from note2:paragraph1 - interpreterSettingManager.removeResourcesBelongsToParagraph("note2", "paragraph1"); + ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1"); // then 1 - assertEquals(1, interpreterSettingManager.getAllResources().size()); + assertEquals(1, ResourcePoolUtils.getAllResources().size()); assertEquals("value2", gson.fromJson( intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(), String.class)); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java new file mode 100644 index 0000000..ebb5100 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scheduler; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Properties; + +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.scheduler.Job.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + private SchedulerFactory schedulerSvc; + private static final int TICK_WAIT = 100; + private static final int MAX_WAIT_CYCLES = 100; + + @Before + public void setUp() throws Exception{ + schedulerSvc = new SchedulerFactory(); + } + + @After + public void tearDown(){ + + } + + @Test + public void test() throws Exception { + Properties p = new Properties(); + final InterpreterGroup intpGroup = new InterpreterGroup(); + Map<String, String> env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + "note", + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + this, + null, + "anonymous", + false); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", + intpA.getInterpreterProcess(), + 10); + + Job job = new Job("jobId", "jobName", null, 200) { + Object results; + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", new InterpreterContext( + "note", + "jobId", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null)); + return "1000"; + } + + @Override + protected boolean jobAbort() { + return false; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + scheduler.submit(job); + + int cycles = 0; + while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job.isRunning()); + + Thread.sleep(5*TICK_WAIT); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(1, scheduler.getJobsRunning().size()); + + cycles = 0; + while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertTrue(job.isTerminated()); + assertEquals(0, scheduler.getJobsWaiting().size()); + assertEquals(0, scheduler.getJobsRunning().size()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + + @Test + public void testAbortOnPending() throws Exception { + Properties p = new Properties(); + final InterpreterGroup intpGroup = new InterpreterGroup(); + Map<String, String> env = new HashMap<>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + + final RemoteInterpreter intpA = new RemoteInterpreter( + p, + "note", + MockInterpreterA.class.getName(), + new File(INTERPRETER_SCRIPT).getAbsolutePath(), + "fake", + "fakeRepo", + env, + 10 * 1000, + this, + null, + "anonymous", + false); + + intpGroup.put("note", new LinkedList<Interpreter>()); + intpGroup.get("note").add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + + Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", "note", + intpA.getInterpreterProcess(), + 10); + + Job job1 = new Job("jobId1", "jobName1", null, 200) { + Object results; + InterpreterContext context = new InterpreterContext( + "note", + "jobId1", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + + Job job2 = new Job("jobId2", "jobName2", null, 200) { + public Object results; + InterpreterContext context = new InterpreterContext( + "note", + "jobId2", + null, + "title", + "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("pool1"), + new LinkedList<InterpreterContextRunner>(), null); + + @Override + public Object getReturn() { + return results; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + intpA.interpret("1000", context); + return "1000"; + } + + @Override + protected boolean jobAbort() { + if (isRunning()) { + intpA.cancel(context); + } + return true; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + }; + + job2.setResult("result2"); + + scheduler.submit(job1); + scheduler.submit(job2); + + + int cycles = 0; + while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + assertTrue(job1.isRunning()); + assertTrue(job2.getStatus() == Status.PENDING); + + job2.abort(); + + cycles = 0; + while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) { + Thread.sleep(TICK_WAIT); + cycles++; + } + + assertNotNull(job1.getDateFinished()); + assertTrue(job1.isTerminated()); + assertNull(job2.getDateFinished()); + assertTrue(job2.isTerminated()); + assertEquals("result2", job2.getReturn()); + + intpA.close(); + schedulerSvc.removeScheduler("test"); + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, int index, String output) { + + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { + + } + + @Override + public void onOutputClear(String noteId, String paragraphId) { + + } + + @Override + public void onMetaInfosReceived(String settingId, Map<String, String> metaInfos) { + + } + + @Override + public void onGetParagraphRunners(String noteId, String paragraphId, RemoteWorksEventListener callback) { + if (callback != null) { + callback.onFinished(new LinkedList<>()); + } + } + + @Override + public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { + } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/conf/interpreter.json ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/conf/interpreter.json b/zeppelin-zengine/src/test/resources/conf/interpreter.json deleted file mode 100644 index 9e26dfe..0000000 --- a/zeppelin-zengine/src/test/resources/conf/interpreter.json +++ /dev/null @@ -1 +0,0 @@ -{} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json b/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json new file mode 100644 index 0000000..65568ef --- /dev/null +++ b/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json @@ -0,0 +1,12 @@ +[ + { + "group": "mock11", + "name": "mock11", + "className": "org.apache.zeppelin.interpreter.mock.MockInterpreter11", + "properties": { + }, + "editor": { + "language": "java" + } + } +] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json b/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json deleted file mode 100644 index 0e6fb21..0000000 --- a/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json +++ /dev/null @@ -1,19 +0,0 @@ -[ - { - "group": "mock1", - "name": "mock1", - "className": "org.apache.zeppelin.interpreter.mock.MockInterpreter1", - "properties": { - }, - "option": { - "remote": false, - "port": -1, - "perNote": "shared", - "perUser": "shared", - "isExistingProcess": false, - "setPermission": false, - "users": [], - "isUserImpersonate": false - } - } -] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json b/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json deleted file mode 100644 index aca418a..0000000 --- a/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json +++ /dev/null @@ -1,19 +0,0 @@ -[ - { - "group": "mock2", - "name": "mock2", - "className": "org.apache.zeppelin.interpreter.mock.MockInterpreter2", - "properties": { - }, - "option": { - "remote": false, - "port": -1, - "perNote": "shared", - "perUser": "isolated", - "isExistingProcess": false, - "setPermission": false, - "users": [], - "isUserImpersonate": false - } - } -] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json b/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json deleted file mode 100644 index 4dfe0a7..0000000 --- a/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json +++ /dev/null @@ -1,19 +0,0 @@ -[ - { - "group": "mock_resource_pool", - "name": "mock_resource_pool", - "className": "org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool", - "properties": { - }, - "option": { - "remote": true, - "port": -1, - "perNote": "shared", - "perUser": "shared", - "isExistingProcess": false, - "setPermission": false, - "users": [], - "isUserImpersonate": false - } - } -] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/log4j.properties b/zeppelin-zengine/src/test/resources/log4j.properties index 74f619b..001a222 100644 --- a/zeppelin-zengine/src/test/resources/log4j.properties +++ b/zeppelin-zengine/src/test/resources/log4j.properties @@ -35,6 +35,7 @@ log4j.logger.org.apache.hadoop.mapred=WARN log4j.logger.org.apache.hadoop.hive.ql=WARN log4j.logger.org.apache.hadoop.hive.metastore=WARN log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN +log4j.logger.org.apache.zeppelin.scheduler=WARN log4j.logger.org.quartz=WARN log4j.logger.DataNucleus=WARN @@ -44,6 +45,4 @@ log4j.logger.DataNucleus.Datastore=ERROR # Log all JDBC parameters log4j.logger.org.hibernate.type=ALL -log4j.logger.org.apache.zeppelin.interpreter=DEBUG -log4j.logger.org.apache.zeppelin.scheduler=DEBUG