Repository: zeppelin
Updated Branches:
  refs/heads/master 8d4902e71 -> 2a3791020


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 603be2e..e1a20b5 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -17,27 +17,31 @@
 
 package org.apache.zeppelin.notebook;
 
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
-import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterFactory;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
-import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
 import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.resource.ResourcePoolUtils;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -52,35 +56,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonatype.aether.RepositoryException;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-public class NotebookTest extends AbstractInterpreterTest implements 
JobListenerFactory {
+public class NotebookTest implements JobListenerFactory{
   private static final Logger logger = 
LoggerFactory.getLogger(NotebookTest.class);
 
+  private File tmpDir;
+  private ZeppelinConfiguration conf;
   private SchedulerFactory schedulerFactory;
+  private File notebookDir;
   private Notebook notebook;
   private NotebookRepo notebookRepo;
+  private InterpreterFactory factory;
+  private InterpreterSettingManager interpreterSettingManager;
+  private DependencyResolver depResolver;
   private NotebookAuthorization notebookAuthorization;
   private Credentials credentials;
   private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS;
@@ -88,30 +75,57 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
 
   @Before
   public void setUp() throws Exception {
-    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC.getVarName(), "true");
-    System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), 
"mock1,mock2");
-    super.setUp();
 
-    schedulerFactory = SchedulerFactory.singleton();
+    tmpDir = new 
File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    new File(tmpDir, "conf").mkdirs();
+    notebookDir = new File(tmpDir + "/notebook");
+    notebookDir.mkdirs();
+
+    System.setProperty(ConfVars.ZEPPELIN_CONF_DIR.getVarName(), 
tmpDir.toString() + "/conf");
+    System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), 
tmpDir.getAbsolutePath());
+    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), 
notebookDir.getAbsolutePath());
+
+    conf = ZeppelinConfiguration.create();
+
+    this.schedulerFactory = new SchedulerFactory();
+
+    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + 
"/local-repo");
+    interpreterSettingManager = new InterpreterSettingManager(conf, 
depResolver, new InterpreterOption(false));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, 
false, interpreterSettingManager);
+
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), 
"mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new 
ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new 
ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, 
InterpreterProperty>());
+
+    ArrayList<InterpreterInfo> interpreterInfos2 = new ArrayList<>();
+    interpreterInfos2.add(new 
InterpreterInfo(MockInterpreter2.class.getName(), "mock2", true, new 
HashMap<String, Object>()));
+    interpreterSettingManager.add("mock2", interpreterInfos2, new 
ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock2", null);
+    interpreterSettingManager.createNewSetting("mock2", "mock2", new 
ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, 
InterpreterProperty>());
+
 
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
     notebookAuthorization = NotebookAuthorization.init(conf);
     credentials = new Credentials(conf.credentialsPersist(), 
conf.getCredentialsPath());
 
-    notebook = new Notebook(conf, notebookRepo, schedulerFactory, 
interpreterFactory, interpreterSettingManager, this, search,
+    notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, 
interpreterSettingManager, this, search,
         notebookAuthorization, credentials);
+    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC.getVarName(), "true");
   }
 
   @After
   public void tearDown() throws Exception {
-    super.tearDown();
+    delete(tmpDir);
   }
 
   @Test
   public void testSelectingReplImplementation() throws IOException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     // run with default repl
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -245,7 +259,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
 
     Notebook notebook2 = new Notebook(
         conf, notebookRepo, schedulerFactory,
-        new InterpreterFactory(interpreterSettingManager),
+        new InterpreterFactory(conf, null, null, null, depResolver, false, 
interpreterSettingManager),
         interpreterSettingManager, null, null, null, null);
 
     assertEquals(1, notebook2.getAllNotes().size());
@@ -302,7 +316,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   @Test
   public void testRunAll() throws IOException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note.getId(), 
interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note.getId(), 
interpreterSettingManager.getDefaultInterpreterSettingList());
 
     // p1
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -341,7 +355,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testSchedule() throws InterruptedException, IOException {
     // create a note and a paragraph
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note.getId(), 
interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note.getId(), 
interpreterSettingManager.getDefaultInterpreterSettingList());
 
     Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     Map config = new HashMap<>();
@@ -414,8 +428,8 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testAutoRestartInterpreterAfterSchedule() throws 
InterruptedException, IOException{
     // create a note and a paragraph
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
-
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+    
     Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     Map config = new HashMap<>();
     p.setConfig(config);
@@ -435,11 +449,11 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
 
 
     MockInterpreter1 mock1 = ((MockInterpreter1) (((ClassloaderInterpreter)
-        ((LazyOpenInterpreter) 
interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), 
"mock1")).getInnerInterpreter())
+        ((LazyOpenInterpreter) factory.getInterpreter(anonymous.getUser(), 
note.getId(), "mock1")).getInnerInterpreter())
         .getInnerInterpreter()));
 
     MockInterpreter2 mock2 = ((MockInterpreter2) (((ClassloaderInterpreter)
-        ((LazyOpenInterpreter) 
interpreterFactory.getInterpreter(anonymous.getUser(), note.getId(), 
"mock2")).getInnerInterpreter())
+        ((LazyOpenInterpreter) factory.getInterpreter(anonymous.getUser(), 
note.getId(), "mock2")).getInnerInterpreter())
         .getInnerInterpreter()));
 
     // wait until interpreters are started
@@ -453,9 +467,9 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
     }
 
     // remove cron scheduler.
-//    config.put("cron", null);
-//    note.setConfig(config);
-//    notebook.refreshCron(note.getId());
+    config.put("cron", null);
+    note.setConfig(config);
+    notebook.refreshCron(note.getId());
 
     // make sure all paragraph has been executed
     assertNotNull(p.getDateFinished());
@@ -467,7 +481,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testExportAndImportNote() throws IOException, 
CloneNotSupportedException,
           InterruptedException, InterpreterException, SchedulerException, 
RepositoryException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note.getId(), 
interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note.getId(), 
interpreterSettingManager.getDefaultInterpreterSettingList());
 
     final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     String simpleText = "hello world";
@@ -506,7 +520,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testCloneNote() throws IOException, CloneNotSupportedException,
       InterruptedException, InterpreterException, SchedulerException, 
RepositoryException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note.getId(), 
interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note.getId(), 
interpreterSettingManager.getDefaultInterpreterSettingList());
 
     final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     p.setText("hello world");
@@ -540,7 +554,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testCloneNoteWithNoName() throws IOException, 
CloneNotSupportedException,
       InterruptedException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     Note cloneNote = notebook.cloneNote(note.getId(), null, anonymous);
     assertEquals(cloneNote.getName(), "Note " + cloneNote.getId());
@@ -552,7 +566,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testCloneNoteWithExceptionResult() throws IOException, 
CloneNotSupportedException,
       InterruptedException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     final Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     p.setText("hello world");
@@ -577,28 +591,28 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   @Test
   public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
-
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
+    for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
+      intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
+    }
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     p1.setText("hello");
     Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     p2.setText("%mock2 world");
-    for (InterpreterGroup intpGroup : 
interpreterSettingManager.getAllInterpreterGroup()) {
-      intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
-    }
+
     note.runAll();
     while (p1.isTerminated() == false || p1.getResult() == null) 
Thread.yield();
     while (p2.isTerminated() == false || p2.getResult() == null) 
Thread.yield();
 
-    assertEquals(2, interpreterSettingManager.getAllResources().size());
+    assertEquals(2, ResourcePoolUtils.getAllResources().size());
 
     // remove a paragraph
     note.removeParagraph(anonymous.getUser(), p1.getId());
-    assertEquals(1, interpreterSettingManager.getAllResources().size());
+    assertEquals(1, ResourcePoolUtils.getAllResources().size());
 
     // remove note
     notebook.removeNote(note.getId(), anonymous);
-    assertEquals(0, interpreterSettingManager.getAllResources().size());
+    assertEquals(0, ResourcePoolUtils.getAllResources().size());
   }
 
   @Test
@@ -606,10 +620,10 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
       IOException {
     // create a note and a paragraph
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     AngularObjectRegistry registry = interpreterSettingManager
-        
.getInterpreterSettings(note.getId()).get(0).getOrCreateInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
+        
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
         .getAngularObjectRegistry();
 
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -639,10 +653,10 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
       IOException {
     // create a note and a paragraph
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     AngularObjectRegistry registry = interpreterSettingManager
-        
.getInterpreterSettings(note.getId()).get(0).getOrCreateInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
+        
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
         .getAngularObjectRegistry();
 
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -673,10 +687,10 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
       IOException {
     // create a note and a paragraph
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     AngularObjectRegistry registry = interpreterSettingManager
-        
.getInterpreterSettings(note.getId()).get(0).getOrCreateInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
+        
.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
         .getAngularObjectRegistry();
 
     // add local scope object
@@ -686,13 +700,14 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
 
     // restart interpreter
     
interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getId());
-    registry = 
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0)
-        .getOrCreateInterpreterGroup(anonymous.getUser(), "sharedProcess")
-        .getAngularObjectRegistry();
-
-    // New InterpreterGroup will be created and its AngularObjectRegistry will 
be created
-    assertNull(registry.get("o1", note.getId(), null));
-    assertNull(registry.get("o2", null, null));
+    registry = 
interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getInterpreterGroup(anonymous.getUser(),
 "sharedProcess")
+    .getAngularObjectRegistry();
+
+    // local and global scope object should be removed
+    // But InterpreterGroup does not implement angularObjectRegistry per 
session (scoped, isolated)
+    // So for now, does not have good way to remove all objects in particular 
session on restart.
+    assertNotNull(registry.get("o1", note.getId(), null));
+    assertNotNull(registry.get("o2", null, null));
     notebook.removeNote(note.getId(), anonymous);
   }
 
@@ -787,7 +802,7 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
   public void testAbortParagraphStatusOnInterpreterRestart() throws 
InterruptedException,
       IOException {
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters(anonymous.getUser(), 
note.getId(), interpreterSettingManager.getDefaultInterpreterSettingList());
 
     // create three paragraphs
     Paragraph p1 = note.addNewParagraph(anonymous);
@@ -811,11 +826,11 @@ public class NotebookTest extends AbstractInterpreterTest 
implements JobListener
     
interpreterSettingManager.restart(interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getId());
 
     // make sure three differnt status aborted well.
-//    assertEquals(Status.FINISHED, p1.getStatus());
-//    assertEquals(Status.ABORT, p2.getStatus());
-//    assertEquals(Status.ABORT, p3.getStatus());
-//
-//    notebook.removeNote(note.getId(), anonymous);
+    assertEquals(Status.FINISHED, p1.getStatus());
+    assertEquals(Status.ABORT, p2.getStatus());
+    assertEquals(Status.ABORT, p3.getStatus());
+
+    notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
index 7c85778..7bd6819 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/ParagraphTest.java
@@ -21,7 +21,6 @@ package org.apache.zeppelin.notebook;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -40,7 +39,7 @@ import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectBuilder;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.Input;
-import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.Interpreter.FormType;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -48,7 +47,10 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSetting.Status;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
@@ -56,7 +58,6 @@ import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.mockito.Mockito;
 
 public class ParagraphTest {
@@ -185,7 +186,7 @@ public class ParagraphTest {
     when(mockInterpreterOption.permissionIsSet()).thenReturn(false);
     when(mockInterpreterSetting.getStatus()).thenReturn(Status.READY);
     when(mockInterpreterSetting.getId()).thenReturn("mock_id_1");
-    when(mockInterpreterSetting.getOrCreateInterpreterGroup(anyString(), 
anyString())).thenReturn(mockInterpreterGroup);
+    when(mockInterpreterSetting.getInterpreterGroup(anyString(), 
anyString())).thenReturn(mockInterpreterGroup);
     spyInterpreterSettingList.add(mockInterpreterSetting);
     when(mockNote.getId()).thenReturn("any_id");
     
when(mockInterpreterSettingManager.getInterpreterSettings(anyString())).thenReturn(spyInterpreterSettingList);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
index 14c8789..803912e 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java
@@ -33,13 +33,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
-import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.notebook.*;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
@@ -72,7 +69,7 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
   private Credentials credentials;
   private AuthenticationInfo anonymous;
   private static final Logger LOG = 
LoggerFactory.getLogger(NotebookRepoSyncTest.class);
-
+  
   @Before
   public void setUp() throws Exception {
     String zpath = 
System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis();
@@ -94,13 +91,12 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     LOG.info("secondary note dir : " + secNotePath);
     conf = ZeppelinConfiguration.create();
 
-    this.schedulerFactory = SchedulerFactory.singleton();
+    this.schedulerFactory = new SchedulerFactory();
 
     depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + 
"/local-repo");
-    interpreterSettingManager = new InterpreterSettingManager(conf,
-        mock(AngularObjectRegistryListener.class), 
mock(RemoteInterpreterProcessListener.class), 
mock(ApplicationEventListener.class));
-    factory = new InterpreterFactory(interpreterSettingManager);
-
+    interpreterSettingManager = new InterpreterSettingManager(conf, 
depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, 
false, interpreterSettingManager);
+    
     search = mock(SearchService.class);
     notebookRepoSync = new NotebookRepoSync(conf);
     notebookAuthorization = NotebookAuthorization.init(conf);
@@ -114,19 +110,19 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
   public void tearDown() throws Exception {
     delete(mainZepDir);
   }
-
+  
   @Test
   public void testRepoCount() throws IOException {
     assertTrue(notebookRepoSync.getMaxRepoNum() >= 
notebookRepoSync.getRepoCount());
   }
-
+  
   @Test
   public void testSyncOnCreate() throws IOException {
     /* check that both storage systems are empty */
     assertTrue(notebookRepoSync.getRepoCount() > 1);
     assertEquals(0, notebookRepoSync.list(0, anonymous).size());
     assertEquals(0, notebookRepoSync.list(1, anonymous).size());
-
+    
     /* create note */
     Note note = notebookSync.createNote(anonymous);
 
@@ -134,7 +130,7 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     assertEquals(1, notebookRepoSync.list(0, anonymous).size());
     assertEquals(1, notebookRepoSync.list(1, anonymous).size());
     assertEquals(notebookRepoSync.list(0, 
anonymous).get(0).getId(),notebookRepoSync.list(1, anonymous).get(0).getId());
-
+    
     notebookSync.removeNote(notebookRepoSync.list(0, null).get(0).getId(), 
anonymous);
   }
 
@@ -144,26 +140,26 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     assertTrue(notebookRepoSync.getRepoCount() > 1);
     assertEquals(0, notebookRepoSync.list(0, anonymous).size());
     assertEquals(0, notebookRepoSync.list(1, anonymous).size());
-
+    
     Note note = notebookSync.createNote(anonymous);
 
     /* check that created in both storage systems */
     assertEquals(1, notebookRepoSync.list(0, anonymous).size());
     assertEquals(1, notebookRepoSync.list(1, anonymous).size());
     assertEquals(notebookRepoSync.list(0, 
anonymous).get(0).getId(),notebookRepoSync.list(1, anonymous).get(0).getId());
-
+    
     /* remove Note */
     notebookSync.removeNote(notebookRepoSync.list(0, 
anonymous).get(0).getId(), anonymous);
-
+    
     /* check that deleted in both storages */
     assertEquals(0, notebookRepoSync.list(0, anonymous).size());
     assertEquals(0, notebookRepoSync.list(1, anonymous).size());
-
+    
   }
-
+  
   @Test
   public void testSyncUpdateMain() throws IOException {
-
+    
     /* create note */
     Note note = notebookSync.createNote(anonymous);
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@@ -171,19 +167,19 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     config.put("enabled", true);
     p1.setConfig(config);
     p1.setText("hello world");
-
+    
     /* new paragraph exists in note instance */
     assertEquals(1, note.getParagraphs().size());
-
+    
     /* new paragraph not yet saved into storages */
     assertEquals(0, notebookRepoSync.get(0,
         notebookRepoSync.list(0, anonymous).get(0).getId(), 
anonymous).getParagraphs().size());
     assertEquals(0, notebookRepoSync.get(1,
         notebookRepoSync.list(1, anonymous).get(0).getId(), 
anonymous).getParagraphs().size());
-
-    /* save to storage under index 0 (first storage) */
+    
+    /* save to storage under index 0 (first storage) */ 
     notebookRepoSync.save(0, note, anonymous);
-
+    
     /* check paragraph saved to first storage */
     assertEquals(1, notebookRepoSync.get(0,
         notebookRepoSync.list(0, anonymous).get(0).getId(), 
anonymous).getParagraphs().size());
@@ -288,45 +284,45 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     // one git versioned storage initialized
     assertThat(vRepoSync.getRepoCount()).isEqualTo(1);
     assertThat(vRepoSync.getRepo(0)).isInstanceOf(GitNotebookRepo.class);
-
+    
     GitNotebookRepo gitRepo = (GitNotebookRepo) vRepoSync.getRepo(0);
-
+    
     // no notes
     assertThat(vRepoSync.list(anonymous).size()).isEqualTo(0);
     // create note
     Note note = vNotebookSync.createNote(anonymous);
     assertThat(vRepoSync.list(anonymous).size()).isEqualTo(1);
-
+    
     String noteId = vRepoSync.list(anonymous).get(0).getId();
     // first checkpoint
     vRepoSync.checkpoint(noteId, "checkpoint message", anonymous);
     int vCount = gitRepo.revisionHistory(noteId, anonymous).size();
     assertThat(vCount).isEqualTo(1);
-
+    
     Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     Map<String, Object> config = p.getConfig();
     config.put("enabled", true);
     p.setConfig(config);
     p.setText("%md checkpoint test");
-
+    
     // save and checkpoint again
     vRepoSync.save(note, anonymous);
     vRepoSync.checkpoint(noteId, "checkpoint message 2", anonymous);
     assertThat(gitRepo.revisionHistory(noteId, 
anonymous).size()).isEqualTo(vCount + 1);
     notebookRepoSync.remove(note.getId(), anonymous);
   }
-
+  
   @Test
   public void testSyncWithAcl() throws IOException {
     /* scenario 1 - note exists with acl on main storage */
     AuthenticationInfo user1 = new AuthenticationInfo("user1");
     Note note = notebookSync.createNote(user1);
     assertEquals(0, note.getParagraphs().size());
-
+    
     // saved on both storages
     assertEquals(1, notebookRepoSync.list(0, null).size());
     assertEquals(1, notebookRepoSync.list(1, null).size());
-
+    
     /* check that user1 is the only owner */
     NotebookAuthorization authInfo = NotebookAuthorization.getInstance();
     Set<String> entity = new HashSet<String>();
@@ -335,23 +331,23 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     assertEquals(1, authInfo.getOwners(note.getId()).size());
     assertEquals(0, authInfo.getReaders(note.getId()).size());
     assertEquals(0, authInfo.getWriters(note.getId()).size());
-
+    
     /* update note and save on secondary storage */
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     p1.setText("hello world");
     assertEquals(1, note.getParagraphs().size());
     notebookRepoSync.save(1, note, null);
-
+    
     /* check paragraph isn't saved into first storage */
     assertEquals(0, notebookRepoSync.get(0,
         notebookRepoSync.list(0, null).get(0).getId(), 
null).getParagraphs().size());
     /* check paragraph is saved into second storage */
     assertEquals(1, notebookRepoSync.get(1,
         notebookRepoSync.list(1, null).get(0).getId(), 
null).getParagraphs().size());
-
+    
     /* now sync by user1 */
     notebookRepoSync.sync(user1);
-
+    
     /* check that note updated and acl are same on main storage*/
     assertEquals(1, notebookRepoSync.get(0,
         notebookRepoSync.list(0, null).get(0).getId(), 
null).getParagraphs().size());
@@ -359,7 +355,7 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     assertEquals(1, authInfo.getOwners(note.getId()).size());
     assertEquals(0, authInfo.getReaders(note.getId()).size());
     assertEquals(0, authInfo.getWriters(note.getId()).size());
-
+    
     /* scenario 2 - note doesn't exist on main storage */
     /* remove from main storage */
     notebookRepoSync.remove(0, note.getId(), user1);
@@ -369,7 +365,7 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
     assertEquals(0, authInfo.getOwners(note.getId()).size());
     assertEquals(0, authInfo.getReaders(note.getId()).size());
     assertEquals(0, authInfo.getWriters(note.getId()).size());
-
+    
     /* now sync - should bring note from secondary storage with added acl */
     notebookRepoSync.sync(user1);
     assertEquals(1, notebookRepoSync.list(0, null).size());
@@ -427,5 +423,5 @@ public class NotebookRepoSyncTest implements 
JobListenerFactory {
       }
     };
   }
-
+       
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
index b393589..6f85bf6 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
@@ -22,15 +22,18 @@ import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-
 import org.apache.zeppelin.dep.Dependency;
 import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterInfo;
 import org.apache.zeppelin.interpreter.InterpreterOption;
@@ -38,7 +41,6 @@ import 
org.apache.zeppelin.interpreter.DefaultInterpreterProperty;
 import org.apache.zeppelin.interpreter.InterpreterProperty;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
-
 import org.apache.zeppelin.notebook.JobListenerFactory;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
@@ -56,34 +58,59 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 
-public class VFSNotebookRepoTest extends AbstractInterpreterTest implements 
JobListenerFactory {
-
+public class VFSNotebookRepoTest implements JobListenerFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(VFSNotebookRepoTest.class);
-
+  private ZeppelinConfiguration conf;
   private SchedulerFactory schedulerFactory;
   private Notebook notebook;
   private NotebookRepo notebookRepo;
+  private InterpreterSettingManager interpreterSettingManager;
+  private InterpreterFactory factory;
+  private DependencyResolver depResolver;
   private NotebookAuthorization notebookAuthorization;
 
+  private File mainZepDir;
+  private File mainNotebookDir;
+
   @Before
   public void setUp() throws Exception {
-    System.setProperty(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), 
"mock1,mock2");
+    String zpath = System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + 
System.currentTimeMillis();
+    mainZepDir = new File(zpath);
+    mainZepDir.mkdirs();
+    new File(mainZepDir, "conf").mkdirs();
+    String mainNotePath = zpath + "/notebook";
+    mainNotebookDir = new File(mainNotePath);
+    mainNotebookDir.mkdirs();
+
+    System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), 
mainZepDir.getAbsolutePath());
+    System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), 
mainNotebookDir.getAbsolutePath());
     System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), 
"org.apache.zeppelin.notebook.repo.VFSNotebookRepo");
+    conf = ZeppelinConfiguration.create();
+
+    this.schedulerFactory = new SchedulerFactory();
+
+    this.schedulerFactory = new SchedulerFactory();
+    depResolver = new DependencyResolver(mainZepDir.getAbsolutePath() + 
"/local-repo");
+    interpreterSettingManager = new InterpreterSettingManager(conf, 
depResolver, new InterpreterOption(true));
+    factory = new InterpreterFactory(conf, null, null, null, depResolver, 
false, interpreterSettingManager);
 
-    super.setUp();
+    ArrayList<InterpreterInfo> interpreterInfos = new ArrayList<>();
+    interpreterInfos.add(new InterpreterInfo(MockInterpreter1.class.getName(), 
"mock1", true, new HashMap<String, Object>()));
+    interpreterSettingManager.add("mock1", interpreterInfos, new 
ArrayList<Dependency>(), new InterpreterOption(),
+        Maps.<String, DefaultInterpreterProperty>newHashMap(), "mock1", null);
+    interpreterSettingManager.createNewSetting("mock1", "mock1", new 
ArrayList<Dependency>(), new InterpreterOption(), new HashMap<String, 
InterpreterProperty>());
 
-    this.schedulerFactory = SchedulerFactory.singleton();
     SearchService search = mock(SearchService.class);
     notebookRepo = new VFSNotebookRepo(conf);
     notebookAuthorization = NotebookAuthorization.init(conf);
-    notebook = new Notebook(conf, notebookRepo, schedulerFactory, 
interpreterFactory, interpreterSettingManager, this, search,
+    notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, 
interpreterSettingManager, this, search,
         notebookAuthorization, null);
   }
 
   @After
   public void tearDown() throws Exception {
-    if (!FileUtils.deleteQuietly(testRootDir)) {
-      LOG.error("Failed to delete {} ", testRootDir.getName());
+    if (!FileUtils.deleteQuietly(mainZepDir)) {
+      LOG.error("Failed to delete {} ", mainZepDir.getName());
     }
   }
 
@@ -93,7 +120,7 @@ public class VFSNotebookRepoTest extends 
AbstractInterpreterTest implements JobL
     int numNotes = notebookRepo.list(null).size();
 
     // when create invalid json file
-    File testNoteDir = new File(notebookDir, "test");
+    File testNoteDir = new File(mainNotebookDir, "test");
     testNoteDir.mkdir();
     FileUtils.writeStringToFile(new File(testNoteDir, "note.json"), "");
 
@@ -105,7 +132,7 @@ public class VFSNotebookRepoTest extends 
AbstractInterpreterTest implements JobL
   public void testSaveNotebook() throws IOException, InterruptedException {
     AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
     Note note = notebook.createNote(anonymous);
-    interpreterSettingManager.setInterpreterBinding("user", note.getId(), 
interpreterSettingManager.getInterpreterSettingIds());
+    interpreterSettingManager.setInterpreters("user", note.getId(), 
interpreterSettingManager.getDefaultInterpreterSettingList());
 
     Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
     Map<String, Object> config = p1.getConfig();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
index 7223109..46134e5 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -17,34 +17,35 @@
 package org.apache.zeppelin.resource;
 
 import com.google.gson.Gson;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.List;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Unittest for DistributedResourcePool
  */
-public class DistributedResourcePoolTest extends AbstractInterpreterTest {
-
+public class DistributedResourcePoolTest {
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private InterpreterGroup intpGroup1;
+  private InterpreterGroup intpGroup2;
+  private HashMap<String, String> env;
   private RemoteInterpreter intp1;
   private RemoteInterpreter intp2;
   private InterpreterContext context;
@@ -54,10 +55,50 @@ public class DistributedResourcePoolTest extends 
AbstractInterpreterTest {
 
   @Before
   public void setUp() throws Exception {
-    super.setUp();
-    InterpreterSetting interpreterSetting = 
interpreterSettingManager.getByName("mock_resource_pool");
-    intp1 = (RemoteInterpreter) interpreterSetting.getInterpreter("user1", 
"note1", "mock_resource_pool");
-    intp2 = (RemoteInterpreter) interpreterSetting.getInterpreter("user2", 
"note1", "mock_resource_pool");
+    env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+
+    Properties p = new Properties();
+
+    intp1 = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterResourcePool.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false
+    );
+
+    intpGroup1 = new InterpreterGroup("intpGroup1");
+    intpGroup1.put("note", new LinkedList<Interpreter>());
+    intpGroup1.get("note").add(intp1);
+    intp1.setInterpreterGroup(intpGroup1);
+
+    intp2 = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterResourcePool.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",        
+        env,
+        10 * 1000,
+        null,
+        null,
+        "anonymous",
+        false
+    );
+
+    intpGroup2 = new InterpreterGroup("intpGroup2");
+    intpGroup2.put("note", new LinkedList<Interpreter>());
+    intpGroup2.get("note").add(intp2);
+    intp2.setInterpreterGroup(intpGroup2);
 
     context = new InterpreterContext(
         "note",
@@ -76,13 +117,26 @@ public class DistributedResourcePoolTest extends 
AbstractInterpreterTest {
     intp1.open();
     intp2.open();
 
-    eventPoller1 = 
intp1.getInterpreterGroup().getRemoteInterpreterProcess().getRemoteInterpreterEventPoller();
-    eventPoller2 = 
intp1.getInterpreterGroup().getRemoteInterpreterProcess().getRemoteInterpreterEventPoller();
+    eventPoller1 = new RemoteInterpreterEventPoller(null, null);
+    eventPoller1.setInterpreterGroup(intpGroup1);
+    
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
+
+    eventPoller2 = new RemoteInterpreterEventPoller(null, null);
+    eventPoller2.setInterpreterGroup(intpGroup2);
+    
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
+
+    eventPoller1.start();
+    eventPoller2.start();
   }
 
   @After
   public void tearDown() throws Exception {
-    interpreterSettingManager.close();
+    eventPoller1.shutdown();
+    intp1.close();
+    intpGroup1.close();
+    eventPoller2.shutdown();
+    intp2.close();
+    intpGroup2.close();
   }
 
   @Test
@@ -181,13 +235,13 @@ public class DistributedResourcePoolTest extends 
AbstractInterpreterTest {
 
 
     // then get all resources.
-    assertEquals(4, interpreterSettingManager.getAllResources().size());
+    assertEquals(4, ResourcePoolUtils.getAllResources().size());
 
     // when remove all resources from note1
-    interpreterSettingManager.removeResourcesBelongsToNote("note1");
+    ResourcePoolUtils.removeResourcesBelongsToNote("note1");
 
     // then resources should be removed.
-    assertEquals(2, interpreterSettingManager.getAllResources().size());
+    assertEquals(2, ResourcePoolUtils.getAllResources().size());
     assertEquals("", gson.fromJson(
         intp1.interpret("get note1:paragraph1:key1", 
context).message().get(0).getData(),
         String.class));
@@ -197,10 +251,10 @@ public class DistributedResourcePoolTest extends 
AbstractInterpreterTest {
 
 
     // when remove all resources from note2:paragraph1
-    interpreterSettingManager.removeResourcesBelongsToParagraph("note2", 
"paragraph1");
+    ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
 
     // then 1
-    assertEquals(1, interpreterSettingManager.getAllResources().size());
+    assertEquals(1, ResourcePoolUtils.getAllResources().size());
     assertEquals("value2", gson.fromJson(
         intp1.interpret("get note2:paragraph2:key2", 
context).message().get(0).getData(),
         String.class));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
new file mode 100644
index 0000000..ebb5100
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
+
+  private static final String INTERPRETER_SCRIPT =
+          System.getProperty("os.name").startsWith("Windows") ?
+                  "../bin/interpreter.cmd" :
+                  "../bin/interpreter.sh";
+  private SchedulerFactory schedulerSvc;
+  private static final int TICK_WAIT = 100;
+  private static final int MAX_WAIT_CYCLES = 100;
+
+  @Before
+  public void setUp() throws Exception{
+    schedulerSvc = new SchedulerFactory();
+  }
+
+  @After
+  public void tearDown(){
+
+  }
+
+  @Test
+  public void test() throws Exception {
+    Properties p = new Properties();
+    final InterpreterGroup intpGroup = new InterpreterGroup();
+    Map<String, String> env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        this,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", 
"note",
+        intpA.getInterpreterProcess(),
+        10);
+
+    Job job = new Job("jobId", "jobName", null, 200) {
+      Object results;
+      @Override
+      public Object getReturn() {
+        return results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", new InterpreterContext(
+            "note",
+            "jobId",
+            null,
+            "title",
+            "text",
+            new AuthenticationInfo(),
+            new HashMap<String, Object>(),
+            new GUI(),
+            new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
+            new LinkedList<InterpreterContextRunner>(), null));
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        return false;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.results = results;
+      }
+    };
+    scheduler.submit(job);
+
+    int cycles = 0;
+    while (!job.isRunning() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+    assertTrue(job.isRunning());
+
+    Thread.sleep(5*TICK_WAIT);
+    assertEquals(0, scheduler.getJobsWaiting().size());
+    assertEquals(1, scheduler.getJobsRunning().size());
+
+    cycles = 0;
+    while (!job.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+
+    assertTrue(job.isTerminated());
+    assertEquals(0, scheduler.getJobsWaiting().size());
+    assertEquals(0, scheduler.getJobsRunning().size());
+
+    intpA.close();
+    schedulerSvc.removeScheduler("test");
+  }
+
+  @Test
+  public void testAbortOnPending() throws Exception {
+    Properties p = new Properties();
+    final InterpreterGroup intpGroup = new InterpreterGroup();
+    Map<String, String> env = new HashMap<>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+
+    final RemoteInterpreter intpA = new RemoteInterpreter(
+        p,
+        "note",
+        MockInterpreterA.class.getName(),
+        new File(INTERPRETER_SCRIPT).getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        this,
+        null,
+        "anonymous",
+        false);
+
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
+    intpA.setInterpreterGroup(intpGroup);
+
+    intpA.open();
+
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", 
"note",
+        intpA.getInterpreterProcess(),
+        10);
+
+    Job job1 = new Job("jobId1", "jobName1", null, 200) {
+      Object results;
+      InterpreterContext context = new InterpreterContext(
+          "note",
+          "jobId1",
+          null,
+          "title",
+          "text",
+          new AuthenticationInfo(),
+          new HashMap<String, Object>(),
+          new GUI(),
+          new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
+          new LinkedList<InterpreterContextRunner>(), null);
+
+      @Override
+      public Object getReturn() {
+        return results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", context);
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        if (isRunning()) {
+          intpA.cancel(context);
+        }
+        return true;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.results = results;
+      }
+    };
+
+    Job job2 = new Job("jobId2", "jobName2", null, 200) {
+      public Object results;
+      InterpreterContext context = new InterpreterContext(
+          "note",
+          "jobId2",
+          null,
+          "title",
+          "text",
+          new AuthenticationInfo(),
+          new HashMap<String, Object>(),
+          new GUI(),
+          new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
+          new LinkedList<InterpreterContextRunner>(), null);
+
+      @Override
+      public Object getReturn() {
+        return results;
+      }
+
+      @Override
+      public int progress() {
+        return 0;
+      }
+
+      @Override
+      public Map<String, Object> info() {
+        return null;
+      }
+
+      @Override
+      protected Object jobRun() throws Throwable {
+        intpA.interpret("1000", context);
+        return "1000";
+      }
+
+      @Override
+      protected boolean jobAbort() {
+        if (isRunning()) {
+          intpA.cancel(context);
+        }
+        return true;
+      }
+
+      @Override
+      public void setResult(Object results) {
+        this.results = results;
+      }
+    };
+
+    job2.setResult("result2");
+
+    scheduler.submit(job1);
+    scheduler.submit(job2);
+
+
+    int cycles = 0;
+    while (!job1.isRunning() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+    assertTrue(job1.isRunning());
+    assertTrue(job2.getStatus() == Status.PENDING);
+
+    job2.abort();
+
+    cycles = 0;
+    while (!job1.isTerminated() && cycles < MAX_WAIT_CYCLES) {
+      Thread.sleep(TICK_WAIT);
+      cycles++;
+    }
+
+    assertNotNull(job1.getDateFinished());
+    assertTrue(job1.isTerminated());
+    assertNull(job2.getDateFinished());
+    assertTrue(job2.isTerminated());
+    assertEquals("result2", job2.getReturn());
+
+    intpA.close();
+    schedulerSvc.removeScheduler("test");
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, int index, 
String output) {
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, int index, 
InterpreterResult.Type type, String output) {
+
+  }
+
+  @Override
+  public void onOutputClear(String noteId, String paragraphId) {
+
+  }
+
+  @Override
+  public void onMetaInfosReceived(String settingId, Map<String, String> 
metaInfos) {
+
+  }
+
+  @Override
+  public void onGetParagraphRunners(String noteId, String paragraphId, 
RemoteWorksEventListener callback) {
+    if (callback != null) {
+      callback.onFinished(new LinkedList<>());
+    }
+  }
+
+  @Override
+  public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws 
Exception {
+  }
+
+  @Override
+  public void onParaInfosReceived(String noteId, String paragraphId, 
+      String interpreterSettingId, Map<String, String> metaInfos) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/conf/interpreter.json
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/conf/interpreter.json 
b/zeppelin-zengine/src/test/resources/conf/interpreter.json
deleted file mode 100644
index 9e26dfe..0000000
--- a/zeppelin-zengine/src/test/resources/conf/interpreter.json
+++ /dev/null
@@ -1 +0,0 @@
-{}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json 
b/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json
new file mode 100644
index 0000000..65568ef
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/resources/interpreter/mock/interpreter-setting.json
@@ -0,0 +1,12 @@
+[
+  {
+    "group": "mock11",
+    "name": "mock11",
+    "className": "org.apache.zeppelin.interpreter.mock.MockInterpreter11",
+    "properties": {
+    },
+    "editor": {
+      "language": "java"
+    }
+  }
+]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json
 
b/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json
deleted file mode 100644
index 0e6fb21..0000000
--- 
a/zeppelin-zengine/src/test/resources/interpreter/mock1/interpreter-setting.json
+++ /dev/null
@@ -1,19 +0,0 @@
-[
-  {
-    "group": "mock1",
-    "name": "mock1",
-    "className": "org.apache.zeppelin.interpreter.mock.MockInterpreter1",
-    "properties": {
-    },
-    "option": {
-      "remote": false,
-      "port": -1,
-      "perNote": "shared",
-      "perUser": "shared",
-      "isExistingProcess": false,
-      "setPermission": false,
-      "users": [],
-      "isUserImpersonate": false
-    }
-  }
-]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json
 
b/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json
deleted file mode 100644
index aca418a..0000000
--- 
a/zeppelin-zengine/src/test/resources/interpreter/mock2/interpreter-setting.json
+++ /dev/null
@@ -1,19 +0,0 @@
-[
-  {
-    "group": "mock2",
-    "name": "mock2",
-    "className": "org.apache.zeppelin.interpreter.mock.MockInterpreter2",
-    "properties": {
-    },
-    "option": {
-      "remote": false,
-      "port": -1,
-      "perNote": "shared",
-      "perUser": "isolated",
-      "isExistingProcess": false,
-      "setPermission": false,
-      "users": [],
-      "isUserImpersonate": false
-    }
-  }
-]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json
 
b/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json
deleted file mode 100644
index 4dfe0a7..0000000
--- 
a/zeppelin-zengine/src/test/resources/interpreter/mock_resource_pool/interpreter-setting.json
+++ /dev/null
@@ -1,19 +0,0 @@
-[
-  {
-    "group": "mock_resource_pool",
-    "name": "mock_resource_pool",
-    "className": 
"org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool",
-    "properties": {
-    },
-    "option": {
-      "remote": true,
-      "port": -1,
-      "perNote": "shared",
-      "perUser": "shared",
-      "isExistingProcess": false,
-      "setPermission": false,
-      "users": [],
-      "isUserImpersonate": false
-    }
-  }
-]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/2a379102/zeppelin-zengine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/resources/log4j.properties 
b/zeppelin-zengine/src/test/resources/log4j.properties
index 74f619b..001a222 100644
--- a/zeppelin-zengine/src/test/resources/log4j.properties
+++ b/zeppelin-zengine/src/test/resources/log4j.properties
@@ -35,6 +35,7 @@ log4j.logger.org.apache.hadoop.mapred=WARN
 log4j.logger.org.apache.hadoop.hive.ql=WARN
 log4j.logger.org.apache.hadoop.hive.metastore=WARN
 log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
+log4j.logger.org.apache.zeppelin.scheduler=WARN
 
 log4j.logger.org.quartz=WARN
 log4j.logger.DataNucleus=WARN
@@ -44,6 +45,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
-log4j.logger.org.apache.zeppelin.scheduler=DEBUG
 

Reply via email to