[ZEPPELIN-513] Dedicated interpreter session per notebook

### What is this PR for?

Currently, all notebooks that binded to the same interpreter setting shares the 
same interpreter instance.
For example, let's say there're `spark-production` interpreter setting and all 
notebooks are using it.
Then all notebooks share a single instance of SparkInterpreter, and that result 
all the variables are being shared across the notebook.

But sometimes isolated name space for variables per notebook is preferred. In 
this case we can create per notebook interpreter instance.

This PR provides generalized facility to provides per notebook interpreter 
instance, not only for SparkInterpreter but for any other existing interpreter 
without modification.

**Design**

It provides toggle switch that user turn on / turn off this mode per 
interpreter setting.
When per notebook interpreter is turned on, it still creates a single remote 
interpreter process per interpreter setting. But inside of the process, each 
notebook will have it's own interpreter instance and the instance is terminated 
automatically on removal of notebook or unbinding note from interpreter setting.

**SparkInterpreter**

Because of the design keeps single remote interpreter process and creates 
multiple interpreter instance inside of it, SparkInterpreter become a problem. 
While multiple SparkContext can not be created on a single process. So this PR 
try to, create per notebook scala compiler instance, but share single 
SparkContext instance. In this way, each notebook can run job concurrently by 
leveraging fairscheduler of SparkContext without running multiple Spark 
application (SparkContext).

And this approach is inspired by 
https://github.com/piyush-mukati/incubator-zeppelin/tree/parallel_scheduler_support_spark,
 this PR trying to generalize it for all other interpreters.

### What type of PR is it?
Feature

### Todos
* [x] - Per note interpreter session
* [x] - Scala compiler instance per note and share a single SparkContext 
(SparkInterpreter)
* [x] - Handle Spark class server correctly
* [x] - Handle SparkInterpreter Progressbar correctly

### Is there a relevant Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-513

### How should this be tested?

Check "Per note session" of your Spark interpreter setting.
And create two different notebook and check scala variables are not shared.

### Screenshots (if appropriate)

Checkbox to enable
![image](https://cloud.githubusercontent.com/assets/1540981/13270280/e6db3686-da41-11e5-9408-8eaa9c2a7350.png)

How it works for SparkInterpreter
![per_note_session](https://cloud.githubusercontent.com/assets/1540981/13135855/3c01f26c-d5c9-11e5-864b-30fc4e9f61d1.gif)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? yes

Author: Lee moon soo <[email protected]>

This patch had conflicts when merged, resolved by
Committer: Lee moon soo <[email protected]>

Closes #703 from Leemoonsoo/notebook_interpreter_session and squashes the 
following commits:

222066c [Lee moon soo] Use nanoTime
8287eb9 [Lee moon soo] Fix style
45af2df [Lee moon soo] Prevent infinitely waiting loop
438b212 [Lee moon soo] Fix typo on the comment
b091b7e [Lee moon soo] Move numReferenceOfSparkContext.incrementAndGet() to the 
end of open() method
4d2533f [Lee moon soo] Per note session -> Separate Interpreter for each note
7b073f6 [Lee moon soo] Merge branch 'master' into notebook_interpreter_session
feabb5f [Lee moon soo] Handle NPE when closing interpreters in group spark.
5047217 [Lee moon soo] Restore angularObject correctly
1f03ebb [Lee moon soo] Merge branch 'master' into notebook_interpreter_session
7730cf3 [Lee moon soo] Add document for per note session mode
dedf93f [Lee moon soo] Nullcheck
72b2235 [Lee moon soo] Protect SparkContext creation
f2299d6 [Lee moon soo] Handle scheduler termination correctly when unbind, bind 
interpreter
269c27d [Lee moon soo] Fix unittest
4299b87 [Lee moon soo] Share a SparkContext across ScalaCompilers
cc33c25 [Lee moon soo] Add more unittests
6dd981a [Lee moon soo] update interpreter module test
1d1638c [Lee moon soo] Fix test
58e5f91 [Lee moon soo] Update spark interpreter
366b651 [Lee moon soo] Don't destroy interpreter on note memove or change 
bindings when interpreter.option.perNoteSession is false
fc9cb3f [Lee moon soo] interpreter session aware interpreter factory
ed1ab0d [Lee moon soo] zeppelin-interpreter note session support
5787984 [Lee moon soo] Add option for per note session interpreter


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/b88f52e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/b88f52e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/b88f52e3

Branch: refs/heads/master
Commit: b88f52e3cf798c46d7e3b0ed3ea9f8bbd2b6d9d8
Parents: 738c10e
Author: Lee moon soo <[email protected]>
Authored: Wed Feb 24 20:01:05 2016 -0800
Committer: Lee moon soo <[email protected]>
Committed: Sun Feb 28 08:18:00 2016 -0800

----------------------------------------------------------------------
 .../img/screenshots/interpreter_binding.png     |  Bin 0 -> 232383 bytes
 .../img/screenshots/interpreter_persession.png  |  Bin 0 -> 37885 bytes
 docs/development/writingzeppelininterpreter.md  |    8 +-
 docs/interpreter/spark.md                       |    5 +
 docs/manual/interpreters.md                     |   17 +-
 .../apache/zeppelin/spark/DepInterpreter.java   |   28 +-
 .../zeppelin/spark/PySparkInterpreter.java      |   42 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |  156 +-
 .../zeppelin/spark/SparkSqlInterpreter.java     |   36 +-
 .../zeppelin/spark/DepInterpreterTest.java      |    5 +-
 .../zeppelin/spark/SparkInterpreterTest.java    |   27 +-
 .../zeppelin/spark/SparkSqlInterpreterTest.java |   13 +-
 .../zeppelin/interpreter/Interpreter.java       |   31 +-
 .../zeppelin/interpreter/InterpreterGroup.java  |  125 +-
 .../interpreter/remote/RemoteAngularObject.java |   17 +-
 .../remote/RemoteAngularObjectRegistry.java     |   23 +-
 .../interpreter/remote/RemoteInterpreter.java   |  102 +-
 .../remote/RemoteInterpreterEventPoller.java    |    3 +-
 .../remote/RemoteInterpreterProcess.java        |    9 +
 .../remote/RemoteInterpreterServer.java         |   88 +-
 .../thrift/RemoteInterpreterService.java        | 1381 +++++++++++++++---
 .../zeppelin/scheduler/RemoteScheduler.java     |    6 +-
 .../zeppelin/scheduler/SchedulerFactory.java    |    2 +
 .../main/thrift/RemoteInterpreterService.thrift |   18 +-
 .../remote/RemoteAngularObjectTest.java         |    4 +-
 .../RemoteInterpreterOutputTestStream.java      |    5 +-
 .../remote/RemoteInterpreterTest.java           |   94 +-
 .../remote/mock/MockInterpreterB.java           |   38 +-
 .../resource/DistributedResourcePoolTest.java   |    8 +-
 .../zeppelin/scheduler/RemoteSchedulerTest.java |   13 +-
 .../zeppelin/rest/InterpreterRestApi.java       |    6 +-
 .../apache/zeppelin/rest/NotebookRestApi.java   |    4 +-
 .../InterpreterSettingListForNoteBind.java      |   11 +-
 .../message/NewInterpreterSettingRequest.java   |    8 +-
 .../UpdateInterpreterSettingRequest.java        |   10 +-
 .../zeppelin/server/JsonExclusionStrategy.java  |    5 +-
 .../apache/zeppelin/server/JsonResponse.java    |    8 +-
 .../zeppelin/rest/InterpreterRestApiTest.java   |    6 +-
 .../zeppelin/socket/NotebookServerTest.java     |   14 +-
 .../interpreter-create/interpreter-create.html  |    5 +
 .../app/interpreter/interpreter.controller.js   |   16 +-
 .../src/app/interpreter/interpreter.html        |   13 +
 .../interpreter/InterpreterFactory.java         |  170 ++-
 .../interpreter/InterpreterInfoSerializer.java  |   58 +
 .../zeppelin/interpreter/InterpreterOption.java |    9 +
 .../interpreter/InterpreterSerializer.java      |   56 -
 .../interpreter/InterpreterSetting.java         |   45 +-
 .../notebook/NoteInterpreterLoader.java         |   90 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |    4 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |   13 +-
 .../interpreter/InterpreterFactoryTest.java     |   21 +-
 .../interpreter/mock/MockInterpreter1.java      |    8 +-
 .../notebook/NoteInterpreterLoaderTest.java     |   41 +-
 .../apache/zeppelin/notebook/NotebookTest.java  |  116 +-
 54 files changed, 2345 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png
----------------------------------------------------------------------
diff --git 
a/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png 
b/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png
new file mode 100644
index 0000000..8b7bb05
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/screenshots/interpreter_binding.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png
----------------------------------------------------------------------
diff --git 
a/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png 
b/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png
new file mode 100644
index 0000000..bb81ed3
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/screenshots/interpreter_persession.png differ

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/development/writingzeppelininterpreter.md
----------------------------------------------------------------------
diff --git a/docs/development/writingzeppelininterpreter.md 
b/docs/development/writingzeppelininterpreter.md
index ced42f8..d9ab84b 100644
--- a/docs/development/writingzeppelininterpreter.md
+++ b/docs/development/writingzeppelininterpreter.md
@@ -22,12 +22,16 @@ limitations under the License.
 ### What is Zeppelin Interpreter
 
 Zeppelin Interpreter is a language backend. For example to use scala code in 
Zeppelin, you need scala interpreter.
-Every Interpreter belongs to an InterpreterGroup. InterpreterGroup is a unit 
of start/stop interpreter.
+Every Interpreter belongs to an InterpreterGroup. 
 Interpreters in the same InterpreterGroup can reference each other. For 
example, SparkSqlInterpreter can reference SparkInterpreter to get SparkContext 
from it while they're in the same group.
 
 <img class="img-responsive" style="width:50%; border: 1px solid #ecf0f1;" 
height="auto" src="/assets/themes/zeppelin/img/interpreter.png" />
 
-All Interpreters in the same interpreter group are launched in a single, 
separate JVM process. The Interpreter communicates with Zeppelin engine via 
thrift.
+InterpreterSetting is configuration of a given InterpreterGroup and a unit of 
start/stop interpreter.
+All Interpreters in the same InterpreterSetting are launched in a single, 
separate JVM process. The Interpreter communicates with Zeppelin engine via 
thrift.
+
+In 'Separate Interpreter for each note' mode, new Interpreter instance will be 
created per notebook. But it still runs on the same JVM while they're in the 
same InterpreterSettings.
+
 
 ### Make your own Interpreter
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/interpreter/spark.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 844cbce..027d4b6 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -263,6 +263,11 @@ select * from ${table=defaultTableName} where text like 
'%${search}%'
 
 To learn more about dynamic form, checkout [Dynamic 
Form](../manual/dynamicform.html).
 
+
+### Separate Interpreter for each note
+
+In 'Separate Interpreter for each note' mode, SparkInterpreter creates scala 
compiler per each notebook. However it still shares the single SparkContext.
+
 ## Setting up Zeppelin with Kerberos
 Logical setup with Zeppelin, Kerberos Distribution Center (KDC), and Spark on 
YARN:
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/docs/manual/interpreters.md
----------------------------------------------------------------------
diff --git a/docs/manual/interpreters.md b/docs/manual/interpreters.md
index 8d37eac..f23600f 100644
--- a/docs/manual/interpreters.md
+++ b/docs/manual/interpreters.md
@@ -32,10 +32,16 @@ When you click the ```+Create``` button in the interpreter 
page, the interpreter
 <img src="/assets/themes/zeppelin/img/screenshots/interpreter_create.png">
 
 ## What is Zeppelin Interpreter Setting?
-Zeppelin interpreter setting is the configuration of a given interpreter on 
Zeppelin server. For example, the properties are required for hive JDBC 
interpreter to connect to the Hive server.
+Zeppelin interpreter setting is the configuration of a given interpreter on 
Zeppelin server. For example, the properties are required for hive JDBC 
interpreter to connect to the Hive server. 
 
 <img src="/assets/themes/zeppelin/img/screenshots/interpreter_setting.png">
 
+Each notebook can be binded to multiple Interpreter Settings using setting 
icon on upper right corner of the notebook.
+
+<img src="/assets/themes/zeppelin/img/screenshots/interpreter_binding.png" 
width="800px">
+
+
+
 ## What is Zeppelin Interpreter Group?
 Every Interpreter is belonged to an **Interpreter Group**. Interpreter Group 
is a unit of start/stop interpreter.
 By default, every interpreter is belonged to a single group, but the group 
might contain more interpreters. For example, Spark interpreter group is 
including Spark support, pySpark, SparkSQL and the dependency loader.
@@ -44,3 +50,12 @@ Technically, Zeppelin interpreters from the same group are 
running in the same J
 
 Each interpreters is belonged to a single group and registered together. All 
of their properties are listed in the interpreter setting like below image.
 <img 
src="/assets/themes/zeppelin/img/screenshots/interpreter_setting_spark.png">
+
+
+## Interpreter binding mode
+
+Each Interpreter Setting can choose one of two different interpreter binding 
mode.
+Shared mode (default) and 'Separate Interpreter for each note' mode. In shared 
mode, every notebook binded to the Interpreter Setting will share the single 
Interpreter instance. In 'Separate Interpreter for each note' mode, each 
notebook will create new Interpreter instance. Therefore each notebook will 
have fresh new Interpreter environment.
+
+<img src="/assets/themes/zeppelin/img/screenshots/interpreter_persession.png" 
width="400px">
+

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
index a4fdae3..5cfa764 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/DepInterpreter.java
@@ -299,23 +299,25 @@ public class DepInterpreter extends Interpreter {
     if (intpGroup == null) {
       return null;
     }
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup){
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (SparkInterpreter) p;
-        }
-      }
+
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+    if (p == null) {
+      return null;
+    }
+
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
-    return null;
+    return (SparkInterpreter) p;
   }
 
   @Override
   public Scheduler getScheduler() {
-    return getSparkInterpreter().getScheduler();
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+    if (sparkInterpreter != null) {
+      return getSparkInterpreter().getScheduler();
+    } else {
+      return null;
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 2d4728d..152f70c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -494,23 +494,18 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
 
   private SparkInterpreter getSparkInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
     LazyOpenInterpreter lazy = null;
     SparkInterpreter spark = null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : getInterpreterGroup()){
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            if (p instanceof LazyOpenInterpreter) {
-              lazy = (LazyOpenInterpreter) p;
-            }
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          spark = (SparkInterpreter) p;
-        }
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
       }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
+    spark = (SparkInterpreter) p;
+
     if (lazy != null) {
       lazy.open();
     }
@@ -554,20 +549,15 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
   }
 
   private DepInterpreter getDepInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) return null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup) {
-        if (intp.getClassName().equals(DepInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (DepInterpreter) p;
-        }
-      }
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
+    if (p == null) {
+      return null;
+    }
+
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
-    return null;
+    return (DepInterpreter) p;
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 1923186..5a1a0fd 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -20,11 +20,13 @@ package org.apache.zeppelin.spark;
 import java.io.File;
 import java.io.PrintWriter;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Joiner;
 
@@ -44,7 +46,6 @@ import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@@ -57,17 +58,15 @@ import 
org.apache.zeppelin.spark.dep.SparkDependencyResolver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Console;
+import scala.*;
 import scala.Enumeration.Value;
-import scala.None;
-import scala.Some;
-import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 import scala.collection.mutable.HashMap;
 import scala.collection.mutable.HashSet;
+import scala.reflect.io.AbstractFile;
 import scala.tools.nsc.Settings;
 import scala.tools.nsc.interpreter.Completion.Candidates;
 import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
@@ -113,16 +112,19 @@ public class SparkInterpreter extends Interpreter {
   private ZeppelinContext z;
   private SparkILoop interpreter;
   private SparkIMain intp;
-  private SparkContext sc;
+  private static SparkContext sc;
+  private static SQLContext sqlc;
+  private static SparkEnv env;
+  private static JobProgressListener sparkListener;
+  private static AbstractFile classOutputDir;
+  private static Integer sharedInterpreterLock = new Integer(0);
+  private static AtomicInteger numReferenceOfSparkContext = new 
AtomicInteger(0);
+
   private SparkOutputStream out;
-  private SQLContext sqlc;
   private SparkDependencyResolver dep;
   private SparkJLineCompletion completor;
 
-  private JobProgressListener sparkListener;
-
   private Map<String, Object> binder;
-  private SparkEnv env;
   private SparkVersion sparkVersion;
 
 
@@ -139,17 +141,21 @@ public class SparkInterpreter extends Interpreter {
     sparkListener = setupListeners(this.sc);
   }
 
-  public synchronized SparkContext getSparkContext() {
-    if (sc == null) {
-      sc = createSparkContext();
-      env = SparkEnv.get();
-      sparkListener = setupListeners(sc);
+  public SparkContext getSparkContext() {
+    synchronized (sharedInterpreterLock) {
+      if (sc == null) {
+        sc = createSparkContext();
+        env = SparkEnv.get();
+        sparkListener = setupListeners(sc);
+      }
+      return sc;
     }
-    return sc;
   }
 
   public boolean isSparkContextInitialized() {
-    return sc != null;
+    synchronized (sharedInterpreterLock) {
+      return sc != null;
+    }
   }
 
   static JobProgressListener setupListeners(SparkContext context) {
@@ -192,33 +198,34 @@ public class SparkInterpreter extends Interpreter {
   }
 
   private boolean useHiveContext() {
-    return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
+    return 
java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
   }
 
   public SQLContext getSQLContext() {
-    if (sqlc == null) {
-      if (useHiveContext()) {
-        String name = "org.apache.spark.sql.hive.HiveContext";
-        Constructor<?> hc;
-        try {
-          hc = getClass().getClassLoader().loadClass(name)
-              .getConstructor(SparkContext.class);
-          sqlc = (SQLContext) hc.newInstance(getSparkContext());
-        } catch (NoSuchMethodException | SecurityException
-            | ClassNotFoundException | InstantiationException
-            | IllegalAccessException | IllegalArgumentException
-            | InvocationTargetException e) {
-          logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
-          // when hive dependency is not loaded, it'll fail.
-          // in this case SQLContext can be used.
+    synchronized (sharedInterpreterLock) {
+      if (sqlc == null) {
+        if (useHiveContext()) {
+          String name = "org.apache.spark.sql.hive.HiveContext";
+          Constructor<?> hc;
+          try {
+            hc = getClass().getClassLoader().loadClass(name)
+                .getConstructor(SparkContext.class);
+            sqlc = (SQLContext) hc.newInstance(getSparkContext());
+          } catch (NoSuchMethodException | SecurityException
+              | ClassNotFoundException | InstantiationException
+              | IllegalAccessException | IllegalArgumentException
+              | InvocationTargetException e) {
+            logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
+            // when hive dependency is not loaded, it'll fail.
+            // in this case SQLContext can be used.
+            sqlc = new SQLContext(getSparkContext());
+          }
+        } else {
           sqlc = new SQLContext(getSparkContext());
         }
-      } else {
-        sqlc = new SQLContext(getSparkContext());
       }
+      return sqlc;
     }
-
-    return sqlc;
   }
 
   public SparkDependencyResolver getDependencyResolver() {
@@ -232,20 +239,15 @@ public class SparkInterpreter extends Interpreter {
   }
 
   private DepInterpreter getDepInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
-    if (intpGroup == null) return null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : intpGroup) {
-        if (intp.getClassName().equals(DepInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          return (DepInterpreter) p;
-        }
-      }
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(DepInterpreter.class.getName());
+    if (p == null) {
+      return null;
+    }
+
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
-    return null;
+    return (DepInterpreter) p;
   }
 
   public SparkContext createSparkContext() {
@@ -477,7 +479,9 @@ public class SparkInterpreter extends Interpreter {
     b.v_$eq(true);
     
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
 
-    /* spark interpreter */
+    System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
+
+    /* create scala repl */
     this.interpreter = new SparkILoop(null, new PrintWriter(out));
 
     interpreter.settings_$eq(settings);
@@ -488,20 +492,38 @@ public class SparkInterpreter extends Interpreter {
     intp.setContextClassLoader();
     intp.initializeSynchronous();
 
-    completor = new SparkJLineCompletion(intp);
+    synchronized (sharedInterpreterLock) {
+      if (classOutputDir == null) {
+        classOutputDir = settings.outputDirs().getSingleOutput().get();
+      } else {
+        // change SparkIMain class output dir
+        settings.outputDirs().setSingleOutput(classOutputDir);
+        ClassLoader cl = intp.classLoader();
 
-    sc = getSparkContext();
-    if (sc.getPoolForName("fair").isEmpty()) {
-      Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
-      int minimumShare = 0;
-      int weight = 1;
-      Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
-      sc.taskScheduler().rootPool().addSchedulable(pool);
-    }
+        try {
+          Field rootField = 
cl.getClass().getSuperclass().getDeclaredField("root");
+          rootField.setAccessible(true);
+          rootField.set(cl, classOutputDir);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+          logger.error(e.getMessage(), e);
+        }
+      }
 
-    sparkVersion = SparkVersion.fromVersionString(sc.version());
+      completor = new SparkJLineCompletion(intp);
 
-    sqlc = getSQLContext();
+      sc = getSparkContext();
+      if (sc.getPoolForName("fair").isEmpty()) {
+        Value schedulingMode = 
org.apache.spark.scheduler.SchedulingMode.FAIR();
+        int minimumShare = 0;
+        int weight = 1;
+        Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
+        sc.taskScheduler().rootPool().addSchedulable(pool);
+      }
+
+      sparkVersion = SparkVersion.fromVersionString(sc.version());
+
+      sqlc = getSQLContext();
+    }
 
     dep = getDependencyResolver();
 
@@ -594,6 +616,8 @@ public class SparkInterpreter extends Interpreter {
         }
       }
     }
+
+    numReferenceOfSparkContext.incrementAndGet();
   }
 
   private List<File> currentClassPath() {
@@ -916,8 +940,12 @@ public class SparkInterpreter extends Interpreter {
 
   @Override
   public void close() {
-    sc.stop();
-    sc = null;
+    logger.info("Close interpreter");
+
+    if (numReferenceOfSparkContext.decrementAndGet() == 0) {
+      sc.stop();
+      sc = null;
+    }
 
     intp.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 82ebc9b..0be7c2d 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -79,23 +79,18 @@ public class SparkSqlInterpreter extends Interpreter {
   }
 
   private SparkInterpreter getSparkInterpreter() {
-    InterpreterGroup intpGroup = getInterpreterGroup();
     LazyOpenInterpreter lazy = null;
     SparkInterpreter spark = null;
-    synchronized (intpGroup) {
-      for (Interpreter intp : getInterpreterGroup()){
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          while (p instanceof WrappedInterpreter) {
-            if (p instanceof LazyOpenInterpreter) {
-              lazy = (LazyOpenInterpreter) p;
-            }
-            p = ((WrappedInterpreter) p).getInnerInterpreter();
-          }
-          spark = (SparkInterpreter) p;
-        }
+    Interpreter p = 
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+
+    while (p instanceof WrappedInterpreter) {
+      if (p instanceof LazyOpenInterpreter) {
+        lazy = (LazyOpenInterpreter) p;
       }
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
     }
+    spark = (SparkInterpreter) p;
+
     if (lazy != null) {
       lazy.open();
     }
@@ -179,15 +174,14 @@ public class SparkSqlInterpreter extends Interpreter {
       // It's because of scheduler is not created yet, and scheduler is 
created by this function.
       // Therefore, we can still use getSparkInterpreter() here, but it's 
better and safe
       // to getSparkInterpreter without opening it.
-      for (Interpreter intp : getInterpreterGroup()) {
-        if (intp.getClassName().equals(SparkInterpreter.class.getName())) {
-          Interpreter p = intp;
-          return p.getScheduler();
-        } else {
-          continue;
-        }
+
+      Interpreter intp =
+          
getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
+      if (intp != null) {
+        return intp.getScheduler();
+      } else {
+        return null;
       }
-      throw new InterpreterException("Can't find SparkInterpreter");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index 11b9328..dc8fd4c 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -52,8 +52,9 @@ public class DepInterpreterTest {
     dep.open();
 
     InterpreterGroup intpGroup = new InterpreterGroup();
-    intpGroup.add(new SparkInterpreter(p));
-    intpGroup.add(dep);
+    intpGroup.put("note", new LinkedList<Interpreter>());
+    intpGroup.get("note").add(new SparkInterpreter(p));
+    intpGroup.get("note").add(dep);
     dep.setInterpreterGroup(intpGroup);
 
     context = new InterpreterContext("note", "id", "title", "text", new 
AuthenticationInfo(),

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 17e844d..bb026d9 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Properties;
 
+import org.apache.spark.HttpServer;
+import org.apache.spark.SecurityManager;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -42,11 +44,11 @@ import org.slf4j.LoggerFactory;
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class SparkInterpreterTest {
   public static SparkInterpreter repl;
+  public static InterpreterGroup intpGroup;
   private InterpreterContext context;
   private File tmpDir;
   public static Logger LOGGER = 
LoggerFactory.getLogger(SparkInterpreterTest.class);
 
-
   /**
    * Get spark version number as a numerical value.
    * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
@@ -70,12 +72,14 @@ public class SparkInterpreterTest {
 
     if (repl == null) {
       Properties p = new Properties();
-
+      intpGroup = new InterpreterGroup();
+      intpGroup.put("note", new LinkedList<Interpreter>());
       repl = new SparkInterpreter(p);
+      repl.setInterpreterGroup(intpGroup);
+      intpGroup.get("note").add(repl);
       repl.open();
     }
 
-    InterpreterGroup intpGroup = new InterpreterGroup();
     context = new InterpreterContext("note", "id", "title", "text",
         new AuthenticationInfo(),
         new HashMap<String, Object>(),
@@ -188,4 +192,21 @@ public class SparkInterpreterTest {
       }
     }
   }
+
+  @Test
+  public void shareSingleSparkContext() throws InterruptedException {
+    // create another SparkInterpreter
+    Properties p = new Properties();
+    SparkInterpreter repl2 = new SparkInterpreter(p);
+    repl2.setInterpreterGroup(intpGroup);
+    intpGroup.get("note").add(repl2);
+    repl2.open();
+
+    assertEquals(Code.SUCCESS,
+        repl.interpret("print(sc.parallelize(1 to 10).count())", 
context).code());
+    assertEquals(Code.SUCCESS,
+        repl2.interpret("print(sc.parallelize(1 to 10).count())", 
context).code());
+
+    repl2.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index a95461f..e5ea9a0 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -51,17 +51,22 @@ public class SparkSqlInterpreterTest {
 
       if (SparkInterpreterTest.repl == null) {
         repl = new SparkInterpreter(p);
+        intpGroup = new InterpreterGroup();
+        repl.setInterpreterGroup(intpGroup);
         repl.open();
         SparkInterpreterTest.repl = repl;
+        SparkInterpreterTest.intpGroup = intpGroup;
       } else {
         repl = SparkInterpreterTest.repl;
+        intpGroup = SparkInterpreterTest.intpGroup;
       }
 
-    sql = new SparkSqlInterpreter(p);
+      sql = new SparkSqlInterpreter(p);
 
-    intpGroup = new InterpreterGroup();
-      intpGroup.add(repl);
-      intpGroup.add(sql);
+      intpGroup = new InterpreterGroup();
+      intpGroup.put("note", new LinkedList<Interpreter>());
+      intpGroup.get("note").add(repl);
+      intpGroup.get("note").add(sql);
       sql.setInterpreterGroup(intpGroup);
       sql.open();
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
index a8c4508..c845478 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java
@@ -121,10 +121,6 @@ public abstract class Interpreter {
    * Called when interpreter is no longer used.
    */
   public void destroy() {
-    Scheduler scheduler = getScheduler();
-    if (scheduler != null) {
-      scheduler.stop();
-    }
   }
 
   public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
@@ -193,6 +189,33 @@ public abstract class Interpreter {
     this.classloaderUrls = classloaderUrls;
   }
 
+  public Interpreter getInterpreterInTheSameSessionByClassName(String 
className) {
+    synchronized (interpreterGroup) {
+      for (List<Interpreter> interpreters : interpreterGroup.values()) {
+        boolean belongsToSameNoteGroup = false;
+        Interpreter interpreterFound = null;
+        for (Interpreter intp : interpreters) {
+          if (intp.getClassName().equals(className)) {
+            interpreterFound = intp;
+          }
+
+          Interpreter p = intp;
+          while (p instanceof WrappedInterpreter) {
+            p = ((WrappedInterpreter) p).getInnerInterpreter();
+          }
+          if (this == p) {
+            belongsToSameNoteGroup = true;
+          }
+        }
+
+        if (belongsToSameNoteGroup) {
+          return interpreterFound;
+        }
+      }
+    }
+    return null;
+  }
+
 
   /**
    * Type of interpreter.

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 4d450be..3ed988a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -23,13 +23,24 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
 
 /**
- * InterpreterGroup is list of interpreters in the same group.
+ * InterpreterGroup is list of interpreters in the same interpreter group.
+ * For example spark, pyspark, sql interpreters are in the same 'spark' group
+ * and InterpreterGroup will have reference to these all interpreters.
+ *
+ * Remember, list of interpreters are dedicated to a note.
+ * (when InterpreterOption.perNoteSession==true)
+ * So InterpreterGroup internally manages map of [noteId, list of interpreters]
+ *
+ * A InterpreterGroup runs on interpreter process.
  * And unit of interpreter instantiate, restart, bind, unbind.
  */
-public class InterpreterGroup extends LinkedList<Interpreter>{
+public class InterpreterGroup extends ConcurrentHashMap<String, 
List<Interpreter>> {
   String id;
 
   Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
@@ -38,10 +49,14 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
   RemoteInterpreterProcess remoteInterpreterProcess;    // attached remote 
interpreter process
   ResourcePool resourcePool;
 
+  // map [notebook session, Interpreters in the group], to support per note 
session interpreters
+  //Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String,
+  // List<Interpreter>>();
+
   private static final Map<String, InterpreterGroup> allInterpreterGroups =
       new ConcurrentHashMap<String, InterpreterGroup>();
 
-  public static InterpreterGroup get(String id) {
+  public static InterpreterGroup getByInterpreterGroupId(String id) {
     return allInterpreterGroups.get(id);
   }
 
@@ -49,11 +64,18 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
     return new LinkedList(allInterpreterGroups.values());
   }
 
+  /**
+   * Create InterpreterGroup with given id
+   * @param id
+   */
   public InterpreterGroup(String id) {
     this.id = id;
     allInterpreterGroups.put(id, this);
   }
 
+  /**
+   * Create InterpreterGroup with autogenerated id
+   */
   public InterpreterGroup() {
     getId();
     allInterpreterGroups.put(id, this);
@@ -73,10 +95,22 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
     }
   }
 
+  /**
+   * Get combined property of all interpreters in this group
+   * @return
+   */
   public Properties getProperty() {
     Properties p = new Properties();
-    for (Interpreter intp : this) {
-      p.putAll(intp.getProperty());
+
+    Collection<List<Interpreter>> intpGroupForANote = this.values();
+    if (intpGroupForANote != null && intpGroupForANote.size() > 0) {
+      for (List<Interpreter> intpGroup : intpGroupForANote) {
+        for (Interpreter intp : intpGroup) {
+          p.putAll(intp.getProperty());
+        }
+        // it's okay to break here while every List<Interpreters> will have 
the same property set
+        break;
+      }
     }
     return p;
   }
@@ -97,13 +131,45 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
     this.remoteInterpreterProcess = remoteInterpreterProcess;
   }
 
+
+
+  /**
+   * Close all interpreter instances in this group
+   */
   public void close() {
+    LOGGER.info("Close interpreter group " + getId());
+    List<Interpreter> intpToClose = new LinkedList<Interpreter>();
+    for (List<Interpreter> intpGroupForNote : this.values()) {
+      intpToClose.addAll(intpGroupForNote);
+    }
+    close(intpToClose);
+  }
+
+  /**
+   * Close all interpreter instances in this group for the note
+   * @param noteId
+   */
+  public void close(String noteId) {
+    LOGGER.info("Close interpreter group " + getId() + " for note " + noteId);
+    List<Interpreter> intpForNote = this.get(noteId);
+    close(intpForNote);
+  }
+
+  private void close(Collection<Interpreter> intpToClose) {
+    if (intpToClose == null) {
+      return;
+    }
     List<Thread> closeThreads = new LinkedList<Thread>();
 
-    for (final Interpreter intp : this) {
+    for (final Interpreter intp : intpToClose) {
       Thread t = new Thread() {
         public void run() {
+          Scheduler scheduler = intp.getScheduler();
           intp.close();
+
+          if (scheduler != null) {
+            SchedulerFactory.singleton().removeScheduler(scheduler.getName());
+          }
         }
       };
 
@@ -120,10 +186,46 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
     }
   }
 
+  /**
+   * Destroy all interpreter instances in this group for the note
+   * @param noteId
+   */
+  public void destroy(String noteId) {
+    LOGGER.info("Destroy interpreter group " + getId() + " for note " + 
noteId);
+    List<Interpreter> intpForNote = this.get(noteId);
+    destroy(intpForNote);
+  }
+
+
+  /**
+   * Destroy all interpreter instances in this group
+   */
   public void destroy() {
+    LOGGER.info("Destroy interpreter group " + getId());
+    List<Interpreter> intpToDestroy = new LinkedList<Interpreter>();
+    for (List<Interpreter> intpGroupForNote : this.values()) {
+      intpToDestroy.addAll(intpGroupForNote);
+    }
+    destroy(intpToDestroy);
+
+    // make sure remote interpreter process terminates
+    if (remoteInterpreterProcess != null) {
+      while (remoteInterpreterProcess.referenceCount() > 0) {
+        remoteInterpreterProcess.dereference();
+      }
+    }
+
+    allInterpreterGroups.remove(id);
+  }
+
+  private void destroy(Collection<Interpreter> intpToDestroy) {
+    if (intpToDestroy == null) {
+      return;
+    }
+
     List<Thread> destroyThreads = new LinkedList<Thread>();
 
-    for (final Interpreter intp : this) {
+    for (final Interpreter intp : intpToDestroy) {
       Thread t = new Thread() {
         public void run() {
           intp.destroy();
@@ -141,16 +243,9 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
         LOGGER.error("Can't close interpreter", e);
       }
     }
+  }
 
-    // make sure remote interpreter process terminates
-    if (remoteInterpreterProcess != null) {
-      while (remoteInterpreterProcess.referenceCount() > 0) {
-        remoteInterpreterProcess.dereference();
-      }
-    }
 
-    allInterpreterGroups.remove(id);
-  }
 
   public void setResourcePool(ResourcePool resourcePool) {
     this.resourcePool = resourcePool;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
index 8948b4e..c1f9b94 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
@@ -19,20 +19,20 @@ package org.apache.zeppelin.interpreter.remote;
 
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectListener;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 
 /**
  * Proxy for AngularObject that exists in remote interpreter process
  */
 public class RemoteAngularObject extends AngularObject {
 
-  private transient RemoteInterpreterProcess remoteInterpreterProcess;
+  private transient InterpreterGroup interpreterGroup;
 
-  RemoteAngularObject(String name, Object o, String noteId, String 
paragraphId, String
-          interpreterGroupId,
-      AngularObjectListener listener,
-      RemoteInterpreterProcess remoteInterpreterProcess) {
+  RemoteAngularObject(String name, Object o, String noteId, String paragraphId,
+      InterpreterGroup interpreterGroup,
+      AngularObjectListener listener) {
     super(name, o, noteId, paragraphId, listener);
-    this.remoteInterpreterProcess = remoteInterpreterProcess;
+    this.interpreterGroup = interpreterGroup;
   }
 
   @Override
@@ -45,8 +45,9 @@ public class RemoteAngularObject extends AngularObject {
 
     if (emitRemoteProcess) {
       // send updated value to remote interpreter
-      remoteInterpreterProcess.updateRemoteAngularObject(getName(), 
getNoteId(), getParagraphId()
-              , o);
+      interpreterGroup.getRemoteInterpreterProcess().
+          updateRemoteAngularObject(
+              getName(), getNoteId(), getParagraphId(), o);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index 790ed95..3789292 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -47,19 +47,7 @@ public class RemoteAngularObjectRegistry extends 
AngularObjectRegistry {
   }
 
   private RemoteInterpreterProcess getRemoteInterpreterProcess() {
-    if (interpreterGroup.size() == 0) {
-      throw new RuntimeException("Can't get remoteInterpreterProcess");
-    }
-    Interpreter p = interpreterGroup.get(0);
-    while (p instanceof WrappedInterpreter) {
-      p = ((WrappedInterpreter) p).getInnerInterpreter();
-    }
-
-    if (p instanceof RemoteInterpreter) {
-      return ((RemoteInterpreter) p).getInterpreterProcess();
-    } else {
-      throw new RuntimeException("Can't get remoteInterpreterProcess");
-    }
+    return interpreterGroup.getRemoteInterpreterProcess();
   }
 
   /**
@@ -141,12 +129,7 @@ public class RemoteAngularObjectRegistry extends 
AngularObjectRegistry {
   @Override
   protected AngularObject createNewAngularObject(String name, Object o, String 
noteId, String
           paragraphId) {
-    RemoteInterpreterProcess remoteInterpreterProcess = 
getRemoteInterpreterProcess();
-    if (remoteInterpreterProcess == null) {
-      throw new RuntimeException("Remote Interpreter process not found");
-    }
-    return new RemoteAngularObject(name, o, noteId, paragraphId, 
getInterpreterGroupId(),
-        getAngularObjectListener(),
-        getRemoteInterpreterProcess());
+    return new RemoteAngularObject(name, o, noteId, paragraphId, 
interpreterGroup,
+        getAngularObjectListener());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index b1eb458..e4d4bff 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -17,19 +17,11 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.thrift.TException;
 import org.apache.zeppelin.display.GUI;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
@@ -43,7 +35,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 /**
- *
+ * Proxy for Interpreter instance that runs on separate process
  */
 public class RemoteInterpreter extends Interpreter {
   private final RemoteInterpreterProcessListener 
remoteInterpreterProcessListener;
@@ -53,13 +45,16 @@ public class RemoteInterpreter extends Interpreter {
   private String interpreterPath;
   private String localRepoPath;
   private String className;
+  private String noteId;
   FormType formType;
   boolean initialized;
   private Map<String, String> env;
   private int connectTimeout;
   private int maxPoolSize;
+  private static String schedulerName;
 
   public RemoteInterpreter(Properties property,
+      String noteId,
       String className,
       String interpreterRunner,
       String interpreterPath,
@@ -68,6 +63,7 @@ public class RemoteInterpreter extends Interpreter {
       int maxPoolSize,
       RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
+    this.noteId = noteId;
     this.className = className;
     initialized = false;
     this.interpreterRunner = interpreterRunner;
@@ -80,6 +76,7 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   public RemoteInterpreter(Properties property,
+      String noteId,
       String className,
       String interpreterRunner,
       String interpreterPath,
@@ -89,6 +86,7 @@ public class RemoteInterpreter extends Interpreter {
       RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
     super(property);
     this.className = className;
+    this.noteId = noteId;
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     this.localRepoPath = localRepoPath;
@@ -123,39 +121,36 @@ public class RemoteInterpreter extends Interpreter {
     }
   }
 
-  private synchronized void init() {
+  public synchronized void init() {
     if (initialized == true) {
       return;
     }
 
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    int rc = interpreterProcess.reference(getInterpreterGroup());
-    interpreterProcess.setMaxPoolSize(this.maxPoolSize);
+
+    interpreterProcess.reference(getInterpreterGroup());
+    interpreterProcess.setMaxPoolSize(
+        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
+
     synchronized (interpreterProcess) {
-      // when first process created
-      if (rc == 1) {
-        // create all interpreter class in this interpreter group
-        Client client = null;
-        try {
-          client = interpreterProcess.getClient();
-        } catch (Exception e1) {
-          throw new InterpreterException(e1);
-        }
+      Client client = null;
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        throw new InterpreterException(e1);
+      }
 
-        boolean broken = false;
-        try {
-          for (Interpreter intp : this.getInterpreterGroup()) {
-            logger.info("Create remote interpreter {}", intp.getClassName());
-            property.put("zeppelin.interpreter.localRepo", localRepoPath);
-            client.createInterpreter(getInterpreterGroup().getId(),
-                    intp.getClassName(), (Map) property);
-          }
-        } catch (TException e) {
-          broken = true;
-          throw new InterpreterException(e);
-        } finally {
-          interpreterProcess.releaseClient(client, broken);
-        }
+      boolean broken = false;
+      try {
+        logger.info("Create remote interpreter {}", getClassName());
+        property.put("zeppelin.interpreter.localRepo", localRepoPath);
+        client.createInterpreter(getInterpreterGroup().getId(), noteId,
+            getClassName(), (Map) property);
+      } catch (TException e) {
+        broken = true;
+        throw new InterpreterException(e);
+      } finally {
+        interpreterProcess.releaseClient(client, broken);
       }
     }
     initialized = true;
@@ -165,19 +160,31 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public void open() {
-    init();
+    InterpreterGroup interpreterGroup = getInterpreterGroup();
+
+    synchronized (interpreterGroup) {
+      // initialize all interpreters in this interpreter group
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      for (Interpreter intp : interpreters) {
+        Interpreter p = intp;
+        while (p instanceof WrappedInterpreter) {
+          p = ((WrappedInterpreter) p).getInnerInterpreter();
+        }
+        ((RemoteInterpreter) p).init();
+      }
+    }
   }
 
   @Override
   public void close() {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
-    Client client = null;
 
+    Client client = null;
     boolean broken = false;
     try {
       client = interpreterProcess.getClient();
       if (client != null) {
-        client.close(className);
+        client.close(noteId, className);
       }
     } catch (TException e) {
       broken = true;
@@ -219,7 +226,8 @@ public class RemoteInterpreter extends Interpreter {
     boolean broken = false;
     try {
       GUI settings = context.getGui();
-      RemoteInterpreterResult remoteResult = client.interpret(className, st, 
convert(context));
+      RemoteInterpreterResult remoteResult = client.interpret(
+          noteId, className, st, convert(context));
 
       Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
           remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
@@ -256,7 +264,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      client.cancel(className, convert(context));
+      client.cancel(noteId, className, convert(context));
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -284,7 +292,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      formType = FormType.valueOf(client.getFormType(className));
+      formType = FormType.valueOf(client.getFormType(noteId, className));
       return formType;
     } catch (TException e) {
       broken = true;
@@ -310,7 +318,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      return client.getProgress(className, convert(context));
+      return client.getProgress(noteId, className, convert(context));
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -332,7 +340,7 @@ public class RemoteInterpreter extends Interpreter {
 
     boolean broken = false;
     try {
-      return client.completion(className, buf, cursor);
+      return client.completion(noteId, className, buf, cursor);
     } catch (TException e) {
       broken = true;
       throw new InterpreterException(e);
@@ -349,7 +357,9 @@ public class RemoteInterpreter extends Interpreter {
       return null;
     } else {
       return SchedulerFactory.singleton().createOrGetRemoteScheduler(
-          "remoteinterpreter_" + interpreterProcess.hashCode(), 
interpreterProcess,
+          RemoteInterpreter.class.getName() + noteId + 
interpreterProcess.hashCode(),
+          noteId,
+          interpreterProcess,
           maxConcurrency);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index be28bbf..8600c78 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -244,7 +244,8 @@ public class RemoteInterpreterEventPoller extends Thread {
   }
 
   private Object getResource(ResourceId resourceId) {
-    InterpreterGroup intpGroup = 
InterpreterGroup.get(resourceId.getResourcePoolId());
+    InterpreterGroup intpGroup = InterpreterGroup.getByInterpreterGroupId(
+        resourceId.getResourcePoolId());
     if (intpGroup == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 9a2d503..67a048b 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -281,6 +281,15 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
       clientPool.setMaxTotal(size + 2);
     }
   }
+
+  public int getMaxPoolSize() {
+    if (clientPool != null) {
+      return clientPool.getMaxTotal();
+    } else {
+      return 0;
+    }
+  }
+
   /**
    * Called when angular object is updated in client side to propagate
    * change to the remote process

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b88f52e3/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 3174484..09ef391 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -23,11 +23,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.server.TThreadPoolServer;
@@ -140,10 +136,9 @@ public class RemoteInterpreterServer
 
 
   @Override
-  public void createInterpreter(String interpreterGroupId, String className, 
Map<String, String>
-          properties)
-      throws TException {
-
+  public void createInterpreter(String interpreterGroupId, String noteId, 
String
+      className,
+                                Map<String, String> properties) throws 
TException {
     if (interpreterGroup == null) {
       interpreterGroup = new InterpreterGroup(interpreterGroupId);
       angularObjectRegistry = new 
AngularObjectRegistry(interpreterGroup.getId(), this);
@@ -165,8 +160,13 @@ public class RemoteInterpreterServer
       repl.setClassloaderUrls(new URL[]{});
 
       synchronized (interpreterGroup) {
-        interpreterGroup.add(new LazyOpenInterpreter(
-            new ClassloaderInterpreter(repl, cl)));
+        List<Interpreter> interpreters = interpreterGroup.get(noteId);
+        if (interpreters == null) {
+          interpreters = new LinkedList<Interpreter>();
+          interpreterGroup.put(noteId, interpreters);
+        }
+
+        interpreters.add(new LazyOpenInterpreter(new 
ClassloaderInterpreter(repl, cl)));
       }
 
       logger.info("Instantiate interpreter {}", className);
@@ -179,9 +179,18 @@ public class RemoteInterpreterServer
     }
   }
 
-  private Interpreter getInterpreter(String className) throws TException {
+  private Interpreter getInterpreter(String noteId, String className) throws 
TException {
+    if (interpreterGroup == null) {
+      throw new TException(
+          new InterpreterException("Interpreter instance " + className + " not 
created"));
+    }
     synchronized (interpreterGroup) {
-      for (Interpreter inp : interpreterGroup) {
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      if (interpreters == null) {
+        throw new TException(
+            new InterpreterException("Interpreter " + className + " not 
initialized"));
+      }
+      for (Interpreter inp : interpreters) {
         if (inp.getClassName().equals(className)) {
           return inp;
         }
@@ -192,23 +201,35 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public void open(String className) throws TException {
-    Interpreter intp = getInterpreter(className);
+  public void open(String noteId, String className) throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     intp.open();
   }
 
   @Override
-  public void close(String className) throws TException {
-    Interpreter intp = getInterpreter(className);
-    intp.close();
+  public void close(String noteId, String className) throws TException {
+    synchronized (interpreterGroup) {
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      if (interpreters != null) {
+        Iterator<Interpreter> it = interpreters.iterator();
+        while (it.hasNext()) {
+          Interpreter inp = it.next();
+          if (inp.getClassName().equals(className)) {
+            inp.close();
+            it.remove();
+            break;
+          }
+        }
+      }
+    }
   }
 
 
   @Override
-  public RemoteInterpreterResult interpret(String className, String st,
+  public RemoteInterpreterResult interpret(String noteId, String className, 
String st,
       RemoteInterpreterContext interpreterContext) throws TException {
     logger.debug("st: {}", st);
-    Interpreter intp = getInterpreter(className);
+    Interpreter intp = getInterpreter(noteId, className);
     InterpreterContext context = convert(interpreterContext);
 
     Scheduler scheduler = intp.getScheduler();
@@ -341,10 +362,10 @@ public class RemoteInterpreterServer
 
 
   @Override
-  public void cancel(String className, RemoteInterpreterContext 
interpreterContext)
+  public void cancel(String noteId, String className, RemoteInterpreterContext 
interpreterContext)
       throws TException {
     logger.info("cancel {} {}", className, 
interpreterContext.getParagraphId());
-    Interpreter intp = getInterpreter(className);
+    Interpreter intp = getInterpreter(noteId, className);
     String jobId = interpreterContext.getParagraphId();
     Job job = intp.getScheduler().removeFromWaitingQueue(jobId);
 
@@ -356,22 +377,24 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public int getProgress(String className, RemoteInterpreterContext 
interpreterContext)
+  public int getProgress(String noteId, String className,
+                         RemoteInterpreterContext interpreterContext)
       throws TException {
-    Interpreter intp = getInterpreter(className);
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.getProgress(convert(interpreterContext));
   }
 
 
   @Override
-  public String getFormType(String className) throws TException {
-    Interpreter intp = getInterpreter(className);
+  public String getFormType(String noteId, String className) throws TException 
{
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.getFormType().toString();
   }
 
   @Override
-  public List<String> completion(String className, String buf, int cursor) 
throws TException {
-    Interpreter intp = getInterpreter(className);
+  public List<String> completion(String noteId, String className, String buf, 
int cursor)
+      throws TException {
+    Interpreter intp = getInterpreter(noteId, className);
     return intp.completion(buf, cursor);
   }
 
@@ -441,14 +464,19 @@ public class RemoteInterpreterServer
   }
 
   @Override
-  public String getStatus(String jobId)
+  public String getStatus(String noteId, String jobId)
       throws TException {
     if (interpreterGroup == null) {
       return "Unknown";
     }
 
     synchronized (interpreterGroup) {
-      for (Interpreter intp : interpreterGroup) {
+      List<Interpreter> interpreters = interpreterGroup.get(noteId);
+      if (interpreters == null) {
+        return "Unknown";
+      }
+
+      for (Interpreter intp : interpreters) {
         for (Job job : intp.getScheduler().getJobsRunning()) {
           if (jobId.equals(job.getId())) {
             return job.getStatus().name();

Reply via email to