Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 738c10e21 -> b88f52e3c


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index af0c2f6..33a3ca6 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -45,14 +45,16 @@ public class RemoteScheduler implements Scheduler {
   boolean terminate = false;
   private String name;
   private int maxConcurrency;
+  private final String noteId;
   private RemoteInterpreterProcess interpreterProcess;
 
-  public RemoteScheduler(String name, ExecutorService executor,
+  public RemoteScheduler(String name, ExecutorService executor, String noteId,
       RemoteInterpreterProcess interpreterProcess, SchedulerListener listener,
       int maxConcurrency) {
     this.name = name;
     this.executor = executor;
     this.listener = listener;
+    this.noteId = noteId;
     this.interpreterProcess = interpreterProcess;
     this.maxConcurrency = maxConcurrency;
   }
@@ -257,7 +259,7 @@ public class RemoteScheduler implements Scheduler {
 
       boolean broken = false;
       try {
-        String statusStr = client.getStatus(job.getId());
+        String statusStr = client.getStatus(noteId, job.getId());
         if ("Unknown".equals(statusStr)) {
           // not found this job in the remote schedulers.
           // maybe not submitted, maybe already finished

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 8b47145..20b4b8a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -86,6 +86,7 @@ public class SchedulerFactory implements SchedulerListener {
 
   public Scheduler createOrGetRemoteScheduler(
       String name,
+      String noteId,
       RemoteInterpreterProcess interpreterProcess,
       int maxConcurrency) {
 
@@ -94,6 +95,7 @@ public class SchedulerFactory implements SchedulerListener {
         Scheduler s = new RemoteScheduler(
             name,
             executor,
+            noteId,
             interpreterProcess,
             this,
             maxConcurrency);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift 
b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index d288324..224433d 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -56,18 +56,18 @@ struct RemoteInterpreterEvent {
 }
 
 service RemoteInterpreterService {
-  void createInterpreter(1: string intpGroupId, 2: string className, 3: 
map<string, string> properties);
+  void createInterpreter(1: string intpGroupId, 2: string noteId, 3: string 
className, 4: map<string, string> properties);
 
-  void open(1: string className);
-  void close(1: string className);
-  RemoteInterpreterResult interpret(1: string className, 2: string st, 3: 
RemoteInterpreterContext interpreterContext);
-  void cancel(1: string className, 2: RemoteInterpreterContext 
interpreterContext);
-  i32 getProgress(1: string className, 2: RemoteInterpreterContext 
interpreterContext);
-  string getFormType(1: string className);
-  list<string> completion(1: string className, 2: string buf, 3: i32 cursor);
+  void open(1: string noteId, 2: string className);
+  void close(1: string noteId, 2: string className);
+  RemoteInterpreterResult interpret(1: string noteId, 2: string className, 3: 
string st, 4: RemoteInterpreterContext interpreterContext);
+  void cancel(1: string noteId, 2: string className, 3: 
RemoteInterpreterContext interpreterContext);
+  i32 getProgress(1: string noteId, 2: string className, 3: 
RemoteInterpreterContext interpreterContext);
+  string getFormType(1: string noteId, 2: string className);
+  list<string> completion(1: string noteId, 2: string className, 3: string 
buf, 4: i32 cursor);
   void shutdown();
 
-  string getStatus(1:string jobId);
+  string getStatus(1: string noteId, 2:string jobId);
 
   RemoteInterpreterEvent getEvent();
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index bd8f436..84327dd 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -61,6 +61,7 @@ public class RemoteAngularObjectTest implements 
AngularObjectRegistryListener {
 
     intp = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterAngular.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -70,7 +71,8 @@ public class RemoteAngularObjectTest implements 
AngularObjectRegistryListener {
         null
     );
 
-    intpGroup.add(intp);
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intp);
     intp.setInterpreterGroup(intpGroup);
 
     context = new InterpreterContext(

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
index c52055c..3fbf5bc 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -44,6 +44,8 @@ public class RemoteInterpreterOutputTestStream implements 
RemoteInterpreterProce
   @Before
   public void setUp() throws Exception {
     intpGroup = new InterpreterGroup();
+    intpGroup.put("note", new LinkedList<Interpreter>());
+
     env = new HashMap<String, String>();
     env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
   }
@@ -57,6 +59,7 @@ public class RemoteInterpreterOutputTestStream implements 
RemoteInterpreterProce
   private RemoteInterpreter createMockInterpreter() {
     RemoteInterpreter intp = new RemoteInterpreter(
         new Properties(),
+        "note",
         MockInterpreterOutputStream.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -65,7 +68,7 @@ public class RemoteInterpreterOutputTestStream implements 
RemoteInterpreterProce
         10 * 1000,
         this);
 
-    intpGroup.add(intp);
+    intpGroup.get("note").add(intp);
     intp.setInterpreterGroup(intpGroup);
     return intp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 333e4b4..182b7a2 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -63,8 +63,13 @@ public class RemoteInterpreterTest {
   }
 
   private RemoteInterpreter createMockInterpreterA(Properties p) {
+    return createMockInterpreterA(p, "note");
+  }
+
+  private RemoteInterpreter createMockInterpreterA(Properties p, String 
noteId) {
     return new RemoteInterpreter(
             p,
+            noteId,
             MockInterpreterA.class.getName(),
             new File("../bin/interpreter.sh").getAbsolutePath(),
             "fake",
@@ -75,8 +80,13 @@ public class RemoteInterpreterTest {
   }
 
   private RemoteInterpreter createMockInterpreterB(Properties p) {
+    return createMockInterpreterB(p, "note");
+  }
+
+  private RemoteInterpreter createMockInterpreterB(Properties p, String 
noteId) {
     return new RemoteInterpreter(
             p,
+            noteId,
             MockInterpreterB.class.getName(),
             new File("../bin/interpreter.sh").getAbsolutePath(),
             "fake",
@@ -89,15 +99,17 @@ public class RemoteInterpreterTest {
   @Test
   public void testRemoteInterperterCall() throws TTransportException, 
IOException {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.get("note").add(intpA);
+
     intpA.setInterpreterGroup(intpGroup);
 
     RemoteInterpreter intpB = createMockInterpreterB(p);
 
-    intpGroup.add(intpB);
+    intpGroup.get("note").add(intpB);
     intpB.setInterpreterGroup(intpGroup);
 
 
@@ -108,10 +120,10 @@ public class RemoteInterpreterTest {
     assertEquals(0, process.getNumIdleClient());
     assertEquals(0, process.referenceCount());
 
-    intpA.open();
+    intpA.open(); // initializa all interpreters in the same group
     assertTrue(process.isRunning());
     assertEquals(1, process.getNumIdleClient());
-    assertEquals(1, process.referenceCount());
+    assertEquals(2, process.referenceCount());
 
     intpA.interpret("1",
         new InterpreterContext(
@@ -144,7 +156,8 @@ public class RemoteInterpreterTest {
 
     RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -167,9 +180,11 @@ public class RemoteInterpreterTest {
   @Test
   public void testRemoteSchedulerSharing() throws TTransportException, 
IOException {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     RemoteInterpreter intpA = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterA.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -178,11 +193,13 @@ public class RemoteInterpreterTest {
         10 * 1000,
         null);
 
-    intpGroup.add(intpA);
+
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     RemoteInterpreter intpB = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterB.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -191,7 +208,7 @@ public class RemoteInterpreterTest {
         10 * 1000,
         null);
 
-    intpGroup.add(intpB);
+    intpGroup.get("note").add(intpB);
     intpB.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -236,15 +253,16 @@ public class RemoteInterpreterTest {
   @Test
   public void testRemoteSchedulerSharingSubmit() throws TTransportException, 
IOException, InterruptedException {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     final RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     final RemoteInterpreter intpB = createMockInterpreterB(p);
 
-    intpGroup.add(intpB);
+    intpGroup.get("note").add(intpB);
     intpB.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -322,13 +340,11 @@ public class RemoteInterpreterTest {
 
     };
     intpB.getScheduler().submit(jobB);
-
     // wait until both job finished
     while (jobA.getStatus() != Status.FINISHED ||
            jobB.getStatus() != Status.FINISHED) {
       Thread.sleep(100);
     }
-
     long end = System.currentTimeMillis();
     assertTrue(end - start >= 1000);
 
@@ -341,10 +357,11 @@ public class RemoteInterpreterTest {
   @Test
   public void testRunOrderPreserved() throws InterruptedException {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     final RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -417,10 +434,11 @@ public class RemoteInterpreterTest {
   public void testRunParallel() throws InterruptedException {
     Properties p = new Properties();
     p.put("parallel", "true");
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     final RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -507,6 +525,7 @@ public class RemoteInterpreterTest {
   @Test
   public void testInterpreterGroupResetAfterProcessFinished() {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     RemoteInterpreter intpA = createMockInterpreterA(p);
 
@@ -525,10 +544,11 @@ public class RemoteInterpreterTest {
   @Test
   public void testInterpreterGroupResetDuringProcessRunning() throws 
InterruptedException {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     final RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -577,7 +597,12 @@ public class RemoteInterpreterTest {
     // restart interpreter
     RemoteInterpreterProcess processA = intpA.getInterpreterProcess();
     intpA.close();
-    intpA.setInterpreterGroup(new 
InterpreterGroup(intpA.getInterpreterGroup().getId()));
+
+    InterpreterGroup newInterpreterGroup =
+        new InterpreterGroup(intpA.getInterpreterGroup().getId());
+    newInterpreterGroup.put("note", new LinkedList<Interpreter>());
+
+    intpA.setInterpreterGroup(newInterpreterGroup);
     intpA.open();
     RemoteInterpreterProcess processB = intpA.getInterpreterProcess();
 
@@ -588,15 +613,16 @@ public class RemoteInterpreterTest {
   @Test
   public void 
testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() {
     Properties p = new Properties();
+    intpGroup.put("note", new LinkedList<Interpreter>());
 
     RemoteInterpreter intpA = createMockInterpreterA(p);
 
-    intpGroup.add(intpA);
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     RemoteInterpreter intpB = createMockInterpreterB(p);
 
-    intpGroup.add(intpB);
+    intpGroup.get("note").add(intpB);
     intpB.setInterpreterGroup(intpGroup);
 
     intpA.open();
@@ -604,4 +630,38 @@ public class RemoteInterpreterTest {
 
     assertEquals(intpA.getScheduler(), intpB.getScheduler());
   }
+
+  @Test
+  public void testMultiInterpreterSession() {
+    Properties p = new Properties();
+    intpGroup.put("sessionA", new LinkedList<Interpreter>());
+    intpGroup.put("sessionB", new LinkedList<Interpreter>());
+
+    RemoteInterpreter intpAsessionA = createMockInterpreterA(p, "sessionA");
+    intpGroup.get("sessionA").add(intpAsessionA);
+    intpAsessionA.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpBsessionA = createMockInterpreterB(p, "sessionA");
+    intpGroup.get("sessionA").add(intpBsessionA);
+    intpBsessionA.setInterpreterGroup(intpGroup);
+
+    intpAsessionA.open();
+    intpBsessionA.open();
+
+    assertEquals(intpAsessionA.getScheduler(), intpBsessionA.getScheduler());
+
+    RemoteInterpreter intpAsessionB = createMockInterpreterA(p, "sessionB");
+    intpGroup.get("sessionB").add(intpAsessionB);
+    intpAsessionB.setInterpreterGroup(intpGroup);
+
+    RemoteInterpreter intpBsessionB = createMockInterpreterB(p, "sessionB");
+    intpGroup.get("sessionB").add(intpBsessionB);
+    intpBsessionB.setInterpreterGroup(intpGroup);
+
+    intpAsessionB.open();
+    intpBsessionB.open();
+
+    assertEquals(intpAsessionB.getScheduler(), intpBsessionB.getScheduler());
+    assertNotEquals(intpAsessionA.getScheduler(), 
intpAsessionB.getScheduler());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
index c7097f2..fa6ff7e 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterB.java
@@ -91,13 +91,30 @@ public class MockInterpreterB extends Interpreter {
 
   public MockInterpreterA getInterpreterA() {
     InterpreterGroup interpreterGroup = getInterpreterGroup();
-    for (Interpreter intp : interpreterGroup) {
-      if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
-        Interpreter p = intp;
-        while (p instanceof WrappedInterpreter) {
-          p = ((WrappedInterpreter) p).getInnerInterpreter();
+    synchronized (interpreterGroup) {
+      for (List<Interpreter> interpreters : interpreterGroup.values()) {
+        boolean belongsToSameNoteGroup = false;
+        MockInterpreterA a = null;
+        for (Interpreter intp : interpreters) {
+          if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
+            Interpreter p = intp;
+            while (p instanceof WrappedInterpreter) {
+              p = ((WrappedInterpreter) p).getInnerInterpreter();
+            }
+            a = (MockInterpreterA) p;
+          }
+
+          Interpreter p = intp;
+          while (p instanceof WrappedInterpreter) {
+            p = ((WrappedInterpreter) p).getInnerInterpreter();
+          }
+          if (this == p) {
+            belongsToSameNoteGroup = true;
+          }
+        }
+        if (belongsToSameNoteGroup) {
+          return a;
         }
-        return (MockInterpreterA) p;
       }
     }
     return null;
@@ -105,13 +122,10 @@ public class MockInterpreterB extends Interpreter {
 
   @Override
   public Scheduler getScheduler() {
-    InterpreterGroup interpreterGroup = getInterpreterGroup();
-    for (Interpreter intp : interpreterGroup) {
-      if (intp.getClassName().equals(MockInterpreterA.class.getName())) {
-        return intp.getScheduler();
-      }
+    MockInterpreterA intpA = getInterpreterA();
+    if (intpA != null) {
+      return intpA.getScheduler();
     }
-
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
index 138c1e4..a99fde2 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -58,6 +58,7 @@ public class DistributedResourcePoolTest {
 
     intp1 = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterResourcePool.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -68,11 +69,13 @@ public class DistributedResourcePoolTest {
     );
 
     intpGroup1 = new InterpreterGroup("intpGroup1");
-    intpGroup1.add(intp1);
+    intpGroup1.put("note", new LinkedList<Interpreter>());
+    intpGroup1.get("note").add(intp1);
     intp1.setInterpreterGroup(intpGroup1);
 
     intp2 = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterResourcePool.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -83,7 +86,8 @@ public class DistributedResourcePoolTest {
     );
 
     intpGroup2 = new InterpreterGroup("intpGroup2");
-    intpGroup2.add(intp2);
+    intpGroup2.put("note", new LinkedList<Interpreter>());
+    intpGroup2.get("note").add(intp2);
     intp2.setInterpreterGroup(intpGroup2);
 
     context = new InterpreterContext(

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index 5acfcc1..9ce7a65 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -31,6 +31,7 @@ import java.util.Properties;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -68,6 +69,7 @@ public class RemoteSchedulerTest implements 
RemoteInterpreterProcessListener {
 
     final RemoteInterpreter intpA = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterA.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -76,12 +78,13 @@ public class RemoteSchedulerTest implements 
RemoteInterpreterProcessListener {
         10 * 1000,
         this);
 
-    intpGroup.add(intpA);
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     intpA.open();
 
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", 
"note",
         intpA.getInterpreterProcess(),
         10);
 
@@ -154,6 +157,7 @@ public class RemoteSchedulerTest implements 
RemoteInterpreterProcessListener {
 
     final RemoteInterpreter intpA = new RemoteInterpreter(
         p,
+        "note",
         MockInterpreterA.class.getName(),
         new File("../bin/interpreter.sh").getAbsolutePath(),
         "fake",
@@ -162,12 +166,13 @@ public class RemoteSchedulerTest implements 
RemoteInterpreterProcessListener {
         10 * 1000,
         this);
 
-    intpGroup.add(intpA);
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(intpA);
     intpA.setInterpreterGroup(intpGroup);
 
     intpA.open();
 
-    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test",
+    Scheduler scheduler = schedulerSvc.createOrGetRemoteScheduler("test", 
"note",
         intpA.getInterpreterProcess(),
         10);
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index 50d7d64..5bf42b4 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -94,11 +94,10 @@ public class InterpreterRestApi {
           NewInterpreterSettingRequest.class);
       Properties p = new Properties();
       p.putAll(request.getProperties());
-      // Option is deprecated from API, always use remote = true
       InterpreterGroup interpreterGroup = 
interpreterFactory.add(request.getName(),
           request.getGroup(),
           request.getDependencies(),
-          new InterpreterOption(true),
+          request.getOption(),
           p);
       InterpreterSetting setting = 
interpreterFactory.get(interpreterGroup.getId());
       logger.info("new setting created with {}", setting.id());
@@ -126,9 +125,8 @@ public class InterpreterRestApi {
     try {
       UpdateInterpreterSettingRequest request = gson.fromJson(message,
           UpdateInterpreterSettingRequest.class);
-      // Option is deprecated from API, always use remote = true
       interpreterFactory.setPropertyAndRestart(settingId,
-          new InterpreterOption(true),
+          request.getOption(),
           request.getProperties(),
           request.getDependencies());
     } catch (InterpreterException e) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index be46f13..2459af8 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -165,7 +165,7 @@ public class NotebookRestApi {
           setting.id(),
           setting.getName(),
           setting.getGroup(),
-          setting.getInterpreterGroup(),
+          setting.getInterpreterInfos(),
           true)
       );
     }
@@ -185,7 +185,7 @@ public class NotebookRestApi {
             setting.id(),
             setting.getName(),
             setting.getGroup(),
-            setting.getInterpreterGroup(),
+            setting.getInterpreterInfos(),
             false)
         );
       }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
index b74054c..e0ddacb 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.rest.message;
 import java.util.List;
 
 import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 
 /**
  * InterpreterSetting information for binding
@@ -29,10 +30,12 @@ public class InterpreterSettingListForNoteBind {
   String name;
   String group;
   private boolean selected;
-  private List<Interpreter> interpreters;
+  private List<InterpreterSetting.InterpreterInfo> interpreters;
 
   public InterpreterSettingListForNoteBind(String id, String name,
-      String group, List<Interpreter> interpreters, boolean selected) {
+                                           String group,
+                                           
List<InterpreterSetting.InterpreterInfo> interpreters,
+                                           boolean selected) {
     super();
     this.id = id;
     this.name = name;
@@ -65,11 +68,11 @@ public class InterpreterSettingListForNoteBind {
     this.group = group;
   }
 
-  public List<Interpreter> getInterpreterNames() {
+  public List<InterpreterSetting.InterpreterInfo> getInterpreterNames() {
     return interpreters;
   }
 
-  public void setInterpreterNames(List<Interpreter> interpreters) {
+  public void setInterpreterNames(List<InterpreterSetting.InterpreterInfo> 
interpreters) {
     this.interpreters = interpreters;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
index 22eb25f..a559fb5 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.interpreter.InterpreterOption;
 
 /**
  *  NewInterpreterSetting rest api request message
@@ -29,9 +30,10 @@ import org.apache.zeppelin.dep.Dependency;
 public class NewInterpreterSettingRequest {
   String name;
   String group;
-  // option was deprecated
+
   Map<String, String> properties;
   List<Dependency> dependencies;
+  InterpreterOption option;
 
   public NewInterpreterSettingRequest() {
 
@@ -52,4 +54,8 @@ public class NewInterpreterSettingRequest {
   public List<Dependency> getDependencies() {
     return dependencies;
   }
+
+  public InterpreterOption getOption() {
+    return option;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
index a3f71ea..c2deeff 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
@@ -21,19 +21,21 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.interpreter.InterpreterOption;
 
 /**
  * UpdateInterpreterSetting rest api request message
  */
 public class UpdateInterpreterSettingRequest {
-  // option was deprecated
   Properties properties;
   List<Dependency> dependencies;
+  InterpreterOption option;
 
   public UpdateInterpreterSettingRequest(Properties properties,
-      List<Dependency> dependencies) {
+      List<Dependency> dependencies, InterpreterOption option) {
     this.properties = properties;
     this.dependencies = dependencies;
+    this.option = option;
   }
 
   public Properties getProperties() {
@@ -43,4 +45,8 @@ public class UpdateInterpreterSettingRequest {
   public List<Dependency> getDependencies() {
     return dependencies;
   }
+
+  public InterpreterOption getOption() {
+    return option;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java
index 95ffd37..1eec8f3 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonExclusionStrategy.java
@@ -23,17 +23,14 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
 
 /**
  * Created by eranw on 8/30/15.
- * Omit InterpreterOption from serialization
  */
 public class JsonExclusionStrategy implements ExclusionStrategy {
 
   public boolean shouldSkipClass(Class<?> arg0) {
-    //exclude only InterpreterOption
-    return InterpreterOption.class.equals(arg0);
+    return false;
   }
 
   public boolean shouldSkipField(FieldAttributes f) {
-
     return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
index 8c640ee..887d42a 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
@@ -23,10 +23,11 @@ import javax.ws.rs.core.NewCookie;
 import javax.ws.rs.core.Response.ResponseBuilder;
 
 import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterSerializer;
+import org.apache.zeppelin.interpreter.InterpreterInfoSerializer;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 
 /**
  * Json response builder.
@@ -98,8 +99,9 @@ public class JsonResponse<T> {
 
   @Override
   public String toString() {
-    GsonBuilder gsonBuilder = new GsonBuilder()
-      .registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
+    GsonBuilder gsonBuilder = new GsonBuilder().registerTypeAdapter(
+        InterpreterSetting.InterpreterInfo.class,
+        new InterpreterInfoSerializer());
     if (pretty) {
       gsonBuilder.setPrettyPrinting();
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
index 8886add..3862717 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
@@ -90,7 +90,8 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
     // Call Create Setting REST API
     String jsonRequest = 
"{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"propvalue\"},"
 +
         
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}],"
 +
-        "\"dependencies\":[]}";
+        "\"dependencies\":[]," +
+        "\"option\": { \"remote\": true, \"perNoteSession\": false }}";
     PostMethod post = httpPost("/interpreter/setting/", jsonRequest);
     LOG.info("testSettingCRUD create response\n" + 
post.getResponseBodyAsString());
     assertThat("test create method:", post, isCreated());
@@ -105,7 +106,8 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
     // Call Update Setting REST API
     jsonRequest = 
"{\"name\":\"md2\",\"group\":\"md\",\"properties\":{\"propname\":\"Otherpropvalue\"},"
 +
         
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}],"
 +
-        "\"dependencies\":[]}";
+        "\"dependencies\":[]," +
+        "\"option\": { \"remote\": true, \"perNoteSession\": false }}";
     PutMethod put = httpPut("/interpreter/setting/" + newSettingId, 
jsonRequest);
     LOG.info("testSettingCRUD update response\n" + 
put.getResponseBodyAsString());
     assertThat("test update method:", put, isAllowed());

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 774f30a..15d8826 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -24,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.socket.Message.OP;
@@ -86,14 +87,17 @@ public class NotebookServerTest extends AbstractTestRestApi 
{
     InterpreterGroup interpreterGroup = null;
     List<InterpreterSetting> settings = 
note1.getNoteReplLoader().getInterpreterSettings();
     for (InterpreterSetting setting : settings) {
-      if (setting.getInterpreterGroup() == null) {
-        continue;
+      if (setting.getName().equals("md")) {
+        interpreterGroup = setting.getInterpreterGroup();
+        break;
       }
-
-      interpreterGroup = setting.getInterpreterGroup();
-      break;
     }
 
+    // start interpreter process
+    Paragraph p1 = note1.addParagraph();
+    p1.setText("%md start remote interpreter process");
+    note1.run(p1.getId());
+
     // add angularObject
     interpreterGroup.getAngularObjectRegistry().add("object1", "value1", 
note1.getId(), null);
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html
----------------------------------------------------------------------
diff --git 
a/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html 
b/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html
index 6f46c4e..f7769b4 100644
--- 
a/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html
+++ 
b/zeppelin-web/src/app/interpreter/interpreter-create/interpreter-create.html
@@ -35,6 +35,11 @@ limitations under the License.
           </select>
         </div>
 
+        <b>Option</b>
+        <div class="checkbox">
+          <label><input type="checkbox" style="top:-5px" 
ng-model="newInterpreterSetting.option.perNoteSession">Separate Interpreter for 
each note</input></label>
+        </div>
+        
         <b>Properties</b>
         <table class="table table-striped properties">
           <tr>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-web/src/app/interpreter/interpreter.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/interpreter/interpreter.controller.js 
b/zeppelin-web/src/app/interpreter/interpreter.controller.js
index 1e3800a..ab66033 100644
--- a/zeppelin-web/src/app/interpreter/interpreter.controller.js
+++ b/zeppelin-web/src/app/interpreter/interpreter.controller.js
@@ -72,7 +72,17 @@ 
angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope,
         $scope.addNewInterpreterDependency(settingId);
       }
 
+      // add missing field of option
+      if (!setting.option) {
+        setting.option = {};
+      }      
+      if (setting.option.remote === undefined) {
+        // remote always true for now
+        setting.option.remote = true;
+      }
+
       var request = {
+        option: angular.copy(setting.option),
         properties: angular.copy(setting.properties),
         dependencies: angular.copy(setting.dependencies)
       };
@@ -214,7 +224,11 @@ 
angular.module('zeppelinWebApp').controller('InterpreterCtrl', function($scope,
       name: undefined,
       group: undefined,
       properties: {},
-      dependencies: []
+      dependencies: [],
+      option: {
+        remote: true,
+        perNoteSession: false
+      }
     };
     emptyNewProperty($scope.newInterpreterSetting);
   };

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-web/src/app/interpreter/interpreter.html
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/interpreter/interpreter.html 
b/zeppelin-web/src/app/interpreter/interpreter.html
index f1f9a02..c30ad61 100644
--- a/zeppelin-web/src/app/interpreter/interpreter.html
+++ b/zeppelin-web/src/app/interpreter/interpreter.html
@@ -109,6 +109,19 @@ limitations under the License.
       </div>
     </div>
     <div class="row interpreter">
+      <div class="col-md-12">
+        <h5>Option</h5>
+        <div class="checkbox">
+          <label>
+            <input type="checkbox"
+                   style="top:-5px"
+                   ng-disabled="!valueform.$visible"
+                   ng-model="setting.option.perNoteSession">
+            Separate Interpreter for each note</input>
+          </label>
+        </div>
+      </div>
+      
       <div ng-show="_.isEmpty(setting.properties) && 
_.isEmpty(setting.dependencies) || valueform.$hidden" class="col-md-12 
gray40-message">
         <em>Currently there are no properties and dependencies set for this 
interpreter</em>
       </div>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 3761709..bf1377c 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -51,7 +51,6 @@ import java.util.*;
 
 /**
  * Manage interpreters.
- *
  */
 public class InterpreterFactory {
   Logger logger = LoggerFactory.getLogger(InterpreterFactory.class);
@@ -103,7 +102,8 @@ public class InterpreterFactory {
 
     GsonBuilder builder = new GsonBuilder();
     builder.setPrettyPrinting();
-    builder.registerTypeAdapter(Interpreter.class, new 
InterpreterSerializer());
+    builder.registerTypeAdapter(
+        InterpreterSetting.InterpreterInfo.class, new 
InterpreterInfoSerializer());
     gson = builder.create();
 
     init();
@@ -197,16 +197,14 @@ public class InterpreterFactory {
       InterpreterSetting setting = interpreterSettings.get(settingId);
       logger.info("Interpreter setting group {} : id={}, name={}",
           setting.getGroup(), settingId, setting.getName());
-      for (Interpreter interpreter : setting.getInterpreterGroup()) {
-        logger.info(" className = {}", interpreter.getClassName());
-      }
     }
   }
 
   private void loadFromFile() throws IOException {
     GsonBuilder builder = new GsonBuilder();
     builder.setPrettyPrinting();
-    builder.registerTypeAdapter(Interpreter.class, new 
InterpreterSerializer());
+    builder.registerTypeAdapter(
+        InterpreterSetting.InterpreterInfo.class, new 
InterpreterInfoSerializer());
     Gson gson = builder.create();
 
     File settingFile = new File(conf.getInterpreterSettingPath());
@@ -241,14 +239,12 @@ public class InterpreterFactory {
           setting.id(),
           setting.getName(),
           setting.getGroup(),
+          setting.getInterpreterInfos(),
+          setting.getProperties(),
           setting.getDependencies(),
           setting.getOption());
 
-      InterpreterGroup interpreterGroup = createInterpreterGroup(
-          setting.id(),
-          setting.getGroup(),
-          setting.getOption(),
-          setting.getProperties());
+      InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), 
setting.getOption());
       intpSetting.setInterpreterGroup(interpreterGroup);
 
       interpreterSettings.put(k, intpSetting);
@@ -380,9 +376,27 @@ public class InterpreterFactory {
       throws InterpreterException, IOException, RepositoryException {
     synchronized (interpreterSettings) {
 
+      List<InterpreterSetting.InterpreterInfo> interpreterInfos =
+          new LinkedList<InterpreterSetting.InterpreterInfo>();
+
+      for (RegisteredInterpreter registeredInterpreter :
+          Interpreter.registeredInterpreters.values()) {
+        if (registeredInterpreter.getGroup().equals(groupName)) {
+          for (String className : interpreterClassList) {
+            if (registeredInterpreter.getClassName().equals(className)) {
+              interpreterInfos.add(
+                  new InterpreterSetting.InterpreterInfo(
+                      className, registeredInterpreter.getName()));
+            }
+          }
+        }
+      }
+
       InterpreterSetting intpSetting = new InterpreterSetting(
           name,
           groupName,
+          interpreterInfos,
+          properties,
           dependencies,
           option);
 
@@ -390,8 +404,7 @@ public class InterpreterFactory {
         loadInterpreterDependencies(intpSetting);
       }
 
-      InterpreterGroup interpreterGroup = createInterpreterGroup(
-          intpSetting.id(), groupName, option, properties);
+      InterpreterGroup interpreterGroup = 
createInterpreterGroup(intpSetting.id(), option);
 
       intpSetting.setInterpreterGroup(interpreterGroup);
 
@@ -401,18 +414,12 @@ public class InterpreterFactory {
     }
   }
 
-  private InterpreterGroup createInterpreterGroup(String id,
-      String groupName,
-      InterpreterOption option,
-      Properties properties)
+  private InterpreterGroup createInterpreterGroup(String id, InterpreterOption 
option)
       throws InterpreterException, NullArgumentException {
 
     //When called from REST API without option we receive NPE
     if (option == null)
       throw new NullArgumentException("option");
-    //When called from REST API without option we receive NPE
-    if (properties == null)
-      throw new NullArgumentException("properties");
 
     AngularObjectRegistry angularObjectRegistry;
 
@@ -430,19 +437,68 @@ public class InterpreterFactory {
     }
 
     interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
+    return interpreterGroup;
+  }
+
+  public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
+                                        String noteId) {
+    if (!interpreterSetting.getOption().isPerNoteSession()) {
+      return;
+    }
+    InterpreterGroup interpreterGroup = 
interpreterSetting.getInterpreterGroup();
+    interpreterGroup.close(noteId);
+    interpreterGroup.destroy(noteId);
+    synchronized (interpreterGroup) {
+      interpreterGroup.remove(noteId);
+      interpreterGroup.notifyAll(); // notify createInterpreterForNote()
+    }
+    logger.info("Interpreter instance {} for note {} is removed",
+        interpreterSetting.getName(),
+        noteId);
+  }
+
+  public void createInterpretersForNote(
+      InterpreterSetting interpreterSetting,
+      String noteId) {
+    InterpreterGroup interpreterGroup = 
interpreterSetting.getInterpreterGroup();
+    String groupName = interpreterSetting.getGroup();
+    InterpreterOption option = interpreterSetting.getOption();
+    Properties properties = interpreterSetting.getProperties();
+
+    // if interpreters are already there, wait until they're being removed
+    synchronized (interpreterGroup) {
+      long interpreterRemovalWaitStart = System.nanoTime();
+      // interpreter process supposed to be terminated by 
RemoteInterpreterProcess.dereference()
+      // in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination 
of the process and
+      // removal from interpreter group take too long, throw an error.
+      long minTimeout = 10 * 1000 * 1000000; // 10 sec
+      long interpreterRemovalWaitTimeout =
+          Math.max(minTimeout, 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 2);
+      while (interpreterGroup.containsKey(noteId)) {
+        if (System.nanoTime() - interpreterRemovalWaitStart > 
interpreterRemovalWaitTimeout) {
+          throw new InterpreterException("Can not create interpreter");
+        }
+        try {
+          interpreterGroup.wait(1000);
+        } catch (InterruptedException e) {
+          logger.debug(e.getMessage(), e);
+        }
+      }
+    }
 
+    logger.info("Create interpreter instance {} for note {}", 
interpreterSetting.getName(), noteId);
 
     for (String className : interpreterClassList) {
       Set<String> keys = Interpreter.registeredInterpreters.keySet();
       for (String intName : keys) {
-        RegisteredInterpreter info = Interpreter.registeredInterpreters
-            .get(intName);
+        RegisteredInterpreter info = 
Interpreter.registeredInterpreters.get(intName);
         if (info.getClassName().equals(className)
             && info.getGroup().equals(groupName)) {
           Interpreter intp;
 
           if (option.isRemote()) {
             intp = createRemoteRepl(info.getPath(),
+                noteId,
                 info.getClassName(),
                 properties,
                 interpreterGroup.id);
@@ -451,15 +507,25 @@ public class InterpreterFactory {
                 info.getClassName(),
                 properties);
           }
-          interpreterGroup.add(intp);
+
+          synchronized (interpreterGroup) {
+            List<Interpreter> interpreters = interpreterGroup.get(noteId);
+            if (interpreters == null) {
+              interpreters = new LinkedList<Interpreter>();
+              interpreterGroup.put(noteId, interpreters);
+            }
+            interpreters.add(intp);
+          }
+          logger.info("Interpreter " + intp.getClassName() + " " + 
intp.hashCode() + " created");
           intp.setInterpreterGroup(interpreterGroup);
           break;
         }
       }
     }
-    return interpreterGroup;
   }
 
+
+
   public void remove(String id) throws IOException {
     synchronized (interpreterSettings) {
       if (interpreterSettings.containsKey(id)) {
@@ -486,7 +552,7 @@ public class InterpreterFactory {
   }
 
   /**
-   * Get loaded interpreters
+   * Get interpreter settings
    * @return
    */
   public List<InterpreterSetting> get() {
@@ -508,8 +574,8 @@ public class InterpreterFactory {
               continue;
             }
           }
+          for (InterpreterSetting.InterpreterInfo intp : 
setting.getInterpreterInfos()) {
 
-          for (Interpreter intp : setting.getInterpreterGroup()) {
             if (className.equals(intp.getClassName())) {
               boolean alreadyAdded = false;
               for (InterpreterSetting st : orderedSettings) {
@@ -536,15 +602,33 @@ public class InterpreterFactory {
 
   public void putNoteInterpreterSettingBinding(String noteId,
       List<String> settingList) throws IOException {
+    List<String> unBindedSettings = new LinkedList<String>();
+
     synchronized (interpreterSettings) {
+      List<String> oldSettings = interpreterBindings.get(noteId);
+      if (oldSettings != null) {
+        for (String oldSettingId : oldSettings) {
+          if (!settingList.contains(oldSettingId)) {
+            unBindedSettings.add(oldSettingId);
+          }
+        }
+      }
       interpreterBindings.put(noteId, settingList);
       saveToFile();
+
+      for (String settingId : unBindedSettings) {
+        InterpreterSetting setting = get(settingId);
+        removeInterpretersForNote(setting, noteId);
+      }
     }
   }
 
   public void removeNoteInterpreterSettingBinding(String noteId) {
     synchronized (interpreterSettings) {
-      interpreterBindings.remove(noteId);
+      List<String> settingIds = interpreterBindings.remove(noteId);
+      for (String settingId : settingIds) {
+        this.removeInterpretersForNote(get(settingId), noteId);
+      }
     }
   }
 
@@ -582,9 +666,7 @@ public class InterpreterFactory {
         intpsetting.setOption(option);
         intpsetting.setDependencies(dependencies);
 
-        InterpreterGroup interpreterGroup = createInterpreterGroup(
-            intpsetting.id(),
-            intpsetting.getGroup(), option, properties);
+        InterpreterGroup interpreterGroup = 
createInterpreterGroup(intpsetting.id(), option);
         intpsetting.setInterpreterGroup(interpreterGroup);
 
         loadInterpreterDependencies(intpsetting);
@@ -608,7 +690,7 @@ public class InterpreterFactory {
 
         InterpreterGroup interpreterGroup = createInterpreterGroup(
             intpsetting.id(),
-            intpsetting.getGroup(), intpsetting.getOption(), 
intpsetting.getProperties());
+            intpsetting.getOption());
         intpsetting.setInterpreterGroup(interpreterGroup);
       } else {
         throw new InterpreterException("Interpreter setting id " + id
@@ -619,16 +701,18 @@ public class InterpreterFactory {
 
   private void stopJobAllInterpreter(InterpreterSetting intpsetting) {
     if (intpsetting != null) {
-      for (Interpreter intp : intpsetting.getInterpreterGroup()) {
-        for (Job job : intp.getScheduler().getJobsRunning()) {
-          job.abort();
-          job.setStatus(Status.ABORT);
-          logger.info("Job " + job.getJobName() + " aborted ");
-        }
-        for (Job job : intp.getScheduler().getJobsWaiting()) {
-          job.abort();
-          job.setStatus(Status.ABORT);
-          logger.info("Job " + job.getJobName() + " aborted ");
+      for (List<Interpreter> interpreters : 
intpsetting.getInterpreterGroup().values()) {
+        for (Interpreter intp : interpreters) {
+          for (Job job : intp.getScheduler().getJobsRunning()) {
+            job.abort();
+            job.setStatus(Status.ABORT);
+            logger.info("Job " + job.getJobName() + " aborted ");
+          }
+          for (Job job : intp.getScheduler().getJobsWaiting()) {
+            job.abort();
+            job.setStatus(Status.ABORT);
+            logger.info("Job " + job.getJobName() + " aborted ");
+          }
         }
       }
     }
@@ -720,13 +804,13 @@ public class InterpreterFactory {
   }
 
 
-  private Interpreter createRemoteRepl(String interpreterPath, String 
className,
+  private Interpreter createRemoteRepl(String interpreterPath, String noteId, 
String className,
       Properties property, String interpreterId) {
     int connectTimeout = 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
     String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + 
interpreterId;
     int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
     LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
-        property, className, conf.getInterpreterRemoteRunnerPath(),
+        property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
         interpreterPath, localRepoPath, connectTimeout,
         maxPoolSize, remoteInterpreterProcessListener));
     return intp;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java
new file mode 100644
index 0000000..5854983
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterInfoSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+
+/**
+ * InterpreterInfo class serializer for gson
+ *
+ */
+public class InterpreterInfoSerializer
+    implements JsonSerializer<InterpreterSetting.InterpreterInfo>,
+    JsonDeserializer<InterpreterSetting.InterpreterInfo> {
+
+  @Override
+  public JsonElement serialize(InterpreterSetting.InterpreterInfo 
interpreterInfo, Type type,
+      JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.addProperty("class", interpreterInfo.getClassName());
+    json.addProperty("name", interpreterInfo.getName());
+    return json;
+  }
+
+  @Override
+  public InterpreterSetting.InterpreterInfo deserialize(JsonElement json, Type 
typeOfT,
+      JsonDeserializationContext context) throws JsonParseException {
+    JsonObject jsonObject = json.getAsJsonObject();
+    String className = jsonObject.get("class").getAsString();
+    String name = jsonObject.get("name").getAsString();
+
+    return new InterpreterSetting.InterpreterInfo(className, name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
index e2adecd..29a4748 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
@@ -22,6 +22,7 @@ package org.apache.zeppelin.interpreter;
  */
 public class InterpreterOption {
   boolean remote;
+  boolean perNoteSession;
 
   public InterpreterOption() {
     remote = false;
@@ -38,4 +39,12 @@ public class InterpreterOption {
   public void setRemote(boolean remote) {
     this.remote = remote;
   }
+
+  public boolean isPerNoteSession() {
+    return perNoteSession;
+  }
+
+  public void setPerNoteSession(boolean perNoteSession) {
+    this.perNoteSession = perNoteSession;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
deleted file mode 100644
index a2deb7e..0000000
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter;
-
-import java.lang.reflect.Type;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
-
-
-/**
- * Interpreter class serializer for gson
- *
- */
-public class InterpreterSerializer implements JsonSerializer<Interpreter>,
-  JsonDeserializer<Interpreter> {
-
-  @Override
-  public JsonElement serialize(Interpreter interpreter, Type type,
-      JsonSerializationContext context) {
-    JsonObject json = new JsonObject();
-    json.addProperty("class", interpreter.getClassName());
-    json.addProperty(
-        "name",
-        Interpreter.findRegisteredInterpreterByClassName(
-            interpreter.getClassName()).getName());
-    return json;
-  }
-
-  @Override
-  public Interpreter deserialize(JsonElement json, Type typeOfT,
-      JsonDeserializationContext context) throws JsonParseException {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index e8080a2..5c93e3a 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -34,27 +34,59 @@ public class InterpreterSetting {
   private String group;
   private String description;
   private Properties properties;
-  private InterpreterGroup interpreterGroup;
+
+  // use 'interpreterGroup' as a field name to keep backward compatibility of
+  // conf/interpreter.json file format
+  private List<InterpreterInfo> interpreterGroup;
+  private transient InterpreterGroup interpreterGroupRef;
   private List<Dependency> dependencies;
   private InterpreterOption option;
 
   public InterpreterSetting(String id,
       String name,
       String group,
+      List<InterpreterInfo> interpreterInfos,
+      Properties properties,
       List<Dependency> dependencies,
       InterpreterOption option) {
     this.id = id;
     this.name = name;
     this.group = group;
+    this.interpreterGroup = interpreterInfos;
+    this.properties = properties;
     this.dependencies = dependencies;
     this.option = option;
   }
 
   public InterpreterSetting(String name,
       String group,
+      List<InterpreterInfo> interpreterInfos,
+      Properties properties,
       List<Dependency> dependencies,
       InterpreterOption option) {
-    this(generateId(), name, group, dependencies, option);
+    this(generateId(), name, group, interpreterInfos, properties, 
dependencies, option);
+  }
+
+  /**
+   * Information of interpreters in this interpreter setting.
+   * this will be serialized for conf/interpreter.json and REST api response.
+   */
+  public static class InterpreterInfo {
+    private final String name;
+    private final String className;
+
+    public InterpreterInfo(String className, String name) {
+      this.className = className;
+      this.name = name;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getClassName() {
+      return className;
+    }
   }
 
   public String id() {
@@ -86,12 +118,11 @@ public class InterpreterSetting {
   }
 
   public InterpreterGroup getInterpreterGroup() {
-    return interpreterGroup;
+    return interpreterGroupRef;
   }
 
   public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
-    this.interpreterGroup = interpreterGroup;
-    this.properties = interpreterGroup.getProperty();
+    this.interpreterGroupRef = interpreterGroup;
   }
 
   public Properties getProperties() {
@@ -120,4 +151,8 @@ public class InterpreterSetting {
   public void setOption(InterpreterOption option) {
     this.option = option;
   }
+
+  public List<InterpreterInfo> getInterpreterInfos() {
+    return interpreterGroup;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
index 509a064..4e7626b 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInterpreterLoader.java
@@ -29,10 +29,11 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 
 /**
- * Repl loader per note.
+ * Interpreter loader per note.
  */
 public class NoteInterpreterLoader {
   private transient InterpreterFactory factory;
+  private static String SHARED_SESSION = "shared_session";
   String noteId;
 
   public NoteInterpreterLoader(InterpreterFactory factory) {
@@ -73,6 +74,37 @@ public class NoteInterpreterLoader {
     return settings;
   }
 
+  private String getInterpreterGroupKey(InterpreterSetting setting) {
+    if (!setting.getOption().isPerNoteSession()) {
+      return SHARED_SESSION;
+    } else {
+      return noteId;
+    }
+  }
+
+  private List<Interpreter> createOrGetInterpreterList(InterpreterSetting 
setting) {
+    InterpreterGroup interpreterGroup = setting.getInterpreterGroup();
+    synchronized (interpreterGroup) {
+      String key = getInterpreterGroupKey(setting);
+      if (!interpreterGroup.containsKey(key)) {
+        factory.createInterpretersForNote(setting, key);
+      }
+      return interpreterGroup.get(getInterpreterGroupKey(setting));
+    }
+  }
+
+  public void close() {
+    // close interpreters in this note session
+    List<InterpreterSetting> settings = this.getInterpreterSettings();
+    if (settings == null || settings.size() == 0) {
+      return;
+    }
+
+    for (InterpreterSetting setting : settings) {
+      factory.removeInterpretersForNote(setting, noteId);
+    }
+  }
+
   public Interpreter get(String replName) {
     List<InterpreterSetting> settings = getInterpreterSettings();
 
@@ -81,7 +113,9 @@ public class NoteInterpreterLoader {
     }
 
     if (replName == null || replName.trim().length() == 0) {
-      return settings.get(0).getInterpreterGroup().getFirst();
+      // get default settings (first available)
+      InterpreterSetting defaultSettings = settings.get(0);
+      return createOrGetInterpreterList(defaultSettings).get(0);
     }
 
     if (Interpreter.registeredInterpreters == null) {
@@ -104,43 +138,47 @@ public class NoteInterpreterLoader {
       String interpreterClassName = registeredInterpreter.getClassName();
 
       for (InterpreterSetting setting : settings) {
-        InterpreterGroup intpGroup = setting.getInterpreterGroup();
-        for (Interpreter interpreter : intpGroup) {
-          if (interpreterClassName.equals(interpreter.getClassName())) {
-            return interpreter;
+        if (registeredInterpreter.getGroup().equals(setting.getGroup())) {
+          List<Interpreter> intpGroup = createOrGetInterpreterList(setting);
+          for (Interpreter interpreter : intpGroup) {
+            if (interpreterClassName.equals(interpreter.getClassName())) {
+              return interpreter;
+            }
           }
         }
       }
+      throw new InterpreterException(replName + " interpreter not found");
     } else {
       // first assume replName is 'name' of interpreter. ('groupName' is 
ommitted)
       // search 'name' from first (default) interpreter group
-      InterpreterGroup intpGroup = settings.get(0).getInterpreterGroup();
-      for (Interpreter interpreter : intpGroup) {
-        RegisteredInterpreter intp = Interpreter
-            .findRegisteredInterpreterByClassName(interpreter.getClassName());
-        if (intp == null) {
-          continue;
-        }
+      InterpreterSetting defaultSetting = settings.get(0);
+      Interpreter.RegisteredInterpreter registeredInterpreter =
+          Interpreter.registeredInterpreters.get(defaultSetting.getGroup() + 
"." + replName);
+      if (registeredInterpreter != null) {
+        List<Interpreter> interpreters = 
createOrGetInterpreterList(defaultSetting);
+        for (Interpreter interpreter : interpreters) {
+
+          RegisteredInterpreter intp =
+              
Interpreter.findRegisteredInterpreterByClassName(interpreter.getClassName());
+          if (intp == null) {
+            continue;
+          }
 
-        if (intp.getName().equals(replName)) {
-          return interpreter;
+          if (intp.getName().equals(replName)) {
+            return interpreter;
+          }
         }
-      }
 
+        throw new InterpreterException(
+            defaultSetting.getGroup() + "." + replName + " interpreter not 
found");
+      }
 
       // next, assume replName is 'group' of interpreter ('name' is ommitted)
       // search interpreter group and return first interpreter.
       for (InterpreterSetting setting : settings) {
-        intpGroup = setting.getInterpreterGroup();
-        Interpreter interpreter = intpGroup.get(0);
-        RegisteredInterpreter intp = Interpreter
-            .findRegisteredInterpreterByClassName(interpreter.getClassName());
-        if (intp == null) {
-          continue;
-        }
-
-        if (intp.getGroup().equals(replName)) {
-          return interpreter;
+        if (setting.getGroup().equals(replName)) {
+          List<Interpreter> interpreters = createOrGetInterpreterList(setting);
+          return interpreters.get(0);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 14d2fd8..79482b8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -244,7 +244,8 @@ public class Notebook {
     Note note = getNote(id);
     if (note != null) {
       note.getNoteReplLoader().setInterpreters(interpreterSettingIds);
-      replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
+      // comment out while note.getNoteReplLoader().setInterpreters(...) do 
the same
+      // replFactory.putNoteInterpreterSettingBinding(id, 
interpreterSettingIds);
     }
   }
 
@@ -278,6 +279,7 @@ public class Notebook {
     synchronized (notes) {
       note = notes.remove(id);
     }
+    replFactory.removeNoteInterpreterSettingBinding(id);
     notebookIndex.deleteIndexDocs(note);
 
     // remove from all interpreter instance's angular object registry

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index ce7528f..75d105f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.JobListener;
+import org.apache.zeppelin.scheduler.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -261,7 +262,17 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
   @Override
   protected boolean jobAbort() {
     Interpreter repl = getRepl(getRequiredReplName());
-    Job job = repl.getScheduler().removeFromWaitingQueue(getId());
+    if (repl == null) {
+      // when interpreters are already destroyed
+      return true;
+    }
+
+    Scheduler scheduler = repl.getScheduler();
+    if (scheduler == null) {
+      return true;
+    }
+
+    Job job = scheduler.removeFromWaitingQueue(getId());
     if (job != null) {
       job.setStatus(Status.ABORT);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 7020a47..d215e7c 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -22,8 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
@@ -87,9 +86,12 @@ public class InterpreterFactoryTest {
   @Test
   public void testBasic() {
     List<String> all = factory.getDefaultInterpreterSettingList();
+    InterpreterSetting setting = factory.get(all.get(0));
+    InterpreterGroup interpreterGroup = setting.getInterpreterGroup();
+    factory.createInterpretersForNote(setting, "session");
 
     // get interpreter
-    Interpreter repl1 = 
factory.get(all.get(0)).getInterpreterGroup().getFirst();
+    Interpreter repl1 = interpreterGroup.get("session").get(0);
     assertFalse(((LazyOpenInterpreter) repl1).isOpen());
     repl1.interpret("repl1", context);
     assertTrue(((LazyOpenInterpreter) repl1).isOpen());
@@ -99,23 +101,14 @@ public class InterpreterFactoryTest {
 
     // restart interpreter
     factory.restart(all.get(0));
-    repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
-    assertFalse(((LazyOpenInterpreter) repl1).isOpen());
+    assertNull(setting.getInterpreterGroup().get("session"));
   }
 
   @Test
   public void testFactoryDefaultList() throws IOException, RepositoryException 
{
-    // get default list from default setting
+    // get default settings
     List<String> all = factory.getDefaultInterpreterSettingList();
     assertEquals(2, all.size());
-    
assertEquals(factory.get(all.get(0)).getInterpreterGroup().getFirst().getClassName(),
 "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
-
-    // add setting
-    factory.add("a mock", "mock2", new LinkedList<Dependency>(), new 
InterpreterOption(false), new Properties());
-    all = factory.getDefaultInterpreterSettingList();
-    assertEquals(2, all.size());
-    assertEquals("mock1", factory.get(all.get(0)).getName());
-    assertEquals("a mock", factory.get(all.get(1)).getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
index 09259b1..079846c 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
@@ -45,7 +45,13 @@ public class MockInterpreter1 extends Interpreter{
 
        @Override
        public InterpreterResult interpret(String st, InterpreterContext 
context) {
-               return new InterpreterResult(InterpreterResult.Code.SUCCESS, 
"repl1: "+st);
+
+               if ("getId".equals(st)) {
+                       // get unique id of this interpreter instance
+                       return new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
+               } else {
+                       return new 
InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
index 04a6ceb..aa98afd 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteInterpreterLoaderTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.zeppelin.notebook;
 
-import static org.junit.Assert.assertEquals;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
@@ -36,6 +34,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class NoteInterpreterLoaderTest {
 
   private File tmpDir;
@@ -92,6 +92,43 @@ public class NoteInterpreterLoaderTest {
     assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter1", 
loader.get("group1.mock1").getClassName());
     assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter11", 
loader.get("group1.mock11").getClassName());
     assertEquals("org.apache.zeppelin.interpreter.mock.MockInterpreter2", 
loader.get("group2.mock2").getClassName());
+
+    loader.close();
+  }
+
+  @Test
+  public void testNoteSession() throws IOException {
+    NoteInterpreterLoader loaderA = new NoteInterpreterLoader(factory);
+    loaderA.setNoteId("noteA");
+    loaderA.setInterpreters(factory.getDefaultInterpreterSettingList());
+    
loaderA.getInterpreterSettings().get(0).getOption().setPerNoteSession(true);
+
+    NoteInterpreterLoader loaderB = new NoteInterpreterLoader(factory);
+    loaderB.setNoteId("noteB");
+    loaderB.setInterpreters(factory.getDefaultInterpreterSettingList());
+    
loaderB.getInterpreterSettings().get(0).getOption().setPerNoteSession(true);
+
+    // interpreters are not created before accessing it
+    
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
+    
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
+
+    // per note session interpreter instance in the same interpreter process
+    assertTrue(
+        loaderA.get(null).getInterpreterGroup().getRemoteInterpreterProcess() 
==
+        loaderB.get(null).getInterpreterGroup().getRemoteInterpreterProcess());
+
+    // interpreters are created after accessing it
+    
assertNotNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
+    
assertNotNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
+
+    // when
+    loaderA.close();
+    loaderB.close();
+
+    // interpreters are destroyed after close
+    
assertNull(loaderA.getInterpreterSettings().get(0).getInterpreterGroup().get("noteA"));
+    
assertNull(loaderB.getInterpreterSettings().get(0).getInterpreterGroup().get("noteB"));
+
   }
 
   private void delete(File file){

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 0b5af2c..db7a466 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -26,23 +26,14 @@ import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.HashSet;
+import java.util.*;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.InterpreterFactory;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
 import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
@@ -489,6 +480,109 @@ public class NotebookTest implements JobListenerFactory{
     assertTrue(isAborted);
   }
 
+  @Test
+  public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException 
{
+    // create a notes
+    Note note1  = notebook.createNote();
+    Paragraph p1 = note1.addParagraph();
+    p1.setText("getId");
+
+    // restart interpreter with per note session enabled
+    for (InterpreterSetting setting : 
note1.getNoteReplLoader().getInterpreterSettings()) {
+      setting.getOption().setPerNoteSession(true);
+      notebook.getInterpreterFactory().restart(setting.id());
+    }
+
+    note1.run(p1.getId());
+    while (p1.getStatus() != Status.FINISHED) Thread.yield();
+    InterpreterResult result = p1.getResult();
+
+    // remove note and recreate
+    notebook.removeNote(note1.getId());
+    note1 = notebook.createNote();
+    p1 = note1.addParagraph();
+    p1.setText("getId");
+
+    note1.run(p1.getId());
+    while (p1.getStatus() != Status.FINISHED) Thread.yield();
+    assertNotEquals(p1.getResult().message(), result.message());
+
+    notebook.removeNote(note1.getId());
+  }
+
+  @Test
+  public void testPerSessionInterpreter() throws IOException {
+    // create two notes
+    Note note1  = notebook.createNote();
+    Paragraph p1 = note1.addParagraph();
+
+    Note note2  = notebook.createNote();
+    Paragraph p2 = note2.addParagraph();
+
+    p1.setText("getId");
+    p2.setText("getId");
+
+    // run per note session disabled
+    note1.run(p1.getId());
+    note2.run(p2.getId());
+
+    while (p1.getStatus() != Status.FINISHED) Thread.yield();
+    while (p2.getStatus() != Status.FINISHED) Thread.yield();
+
+    assertEquals(p1.getResult().message(), p2.getResult().message());
+
+
+    // restart interpreter with per note session enabled
+    for (InterpreterSetting setting : 
note1.getNoteReplLoader().getInterpreterSettings()) {
+      setting.getOption().setPerNoteSession(true);
+      notebook.getInterpreterFactory().restart(setting.id());
+    }
+
+    // run per note session enabled
+    note1.run(p1.getId());
+    note2.run(p2.getId());
+
+    while (p1.getStatus() != Status.FINISHED) Thread.yield();
+    while (p2.getStatus() != Status.FINISHED) Thread.yield();
+
+    assertNotEquals(p1.getResult().message(), p2.getResult().message());
+
+    notebook.removeNote(note1.getId());
+    notebook.removeNote(note2.getId());
+  }
+
+  @Test
+  public void testPerSessionInterpreterCloseOnUnbindInterpreterSetting() 
throws IOException {
+    // create a notes
+    Note note1  = notebook.createNote();
+    Paragraph p1 = note1.addParagraph();
+    p1.setText("getId");
+
+    // restart interpreter with per note session enabled
+    for (InterpreterSetting setting : 
note1.getNoteReplLoader().getInterpreterSettings()) {
+      setting.getOption().setPerNoteSession(true);
+      notebook.getInterpreterFactory().restart(setting.id());
+    }
+
+    note1.run(p1.getId());
+    while (p1.getStatus() != Status.FINISHED) Thread.yield();
+    InterpreterResult result = p1.getResult();
+
+
+    // unbind, and rebind setting. that result interpreter instance close
+    List<String> bindedSettings = 
notebook.getBindedInterpreterSettingsIds(note1.getId());
+    notebook.bindInterpretersToNote(note1.getId(), new LinkedList<String>());
+    notebook.bindInterpretersToNote(note1.getId(), bindedSettings);
+
+    note1.run(p1.getId());
+    while (p1.getStatus() != Status.FINISHED) Thread.yield();
+
+    assertNotEquals(result.message(), p1.getResult().message());
+
+    notebook.removeNote(note1.getId());
+  }
+
+
   private void delete(File file){
     if(file.isFile()) file.delete();
     else if(file.isDirectory()){

Reply via email to