Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 218a3b5bc -> ddf2c89ec


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
new file mode 100644
index 0000000..a8becb4
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ByteBufferInputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * InputStream from bytebuffer
+ */
+public class ByteBufferInputStream extends InputStream {
+
+  ByteBuffer buf;
+
+  public ByteBufferInputStream(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  public int read() throws IOException {
+    if (!buf.hasRemaining()) {
+      return -1;
+    }
+    return buf.get() & 0xFF;
+  }
+
+  public int read(byte[] bytes, int off, int len) throws IOException {
+    if (!buf.hasRemaining()) {
+      return -1;
+    }
+    len = Math.min(len, buf.remaining());
+    buf.get(bytes, off, len);
+    return len;
+  }
+
+  public static InputStream get(ByteBuffer buf) {
+    if (buf.hasArray()) {
+      return new ByteArrayInputStream(buf.array());
+    } else {
+      return new ByteBufferInputStream(buf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
new file mode 100644
index 0000000..3f03b92
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * distributed resource pool
+ */
+public class DistributedResourcePool extends LocalResourcePool {
+
+  private final ResourcePoolConnector connector;
+
+  public DistributedResourcePool(String id, ResourcePoolConnector connector) {
+    super(id);
+    this.connector = connector;
+  }
+
+  @Override
+  public Resource get(String name) {
+    return get(name, true);
+  }
+
+  /**
+   * get resource by name.
+   * @param name
+   * @param remote false only return from local resource
+   * @return null if resource not found.
+   */
+  public Resource get(String name, boolean remote) {
+    // try local first
+    Resource resource = super.get(name);
+    if (resource != null) {
+      return resource;
+    }
+
+    if (remote) {
+      ResourceSet resources = connector.getAllResources().filterByName(name);
+      if (resources.isEmpty()) {
+        return null;
+      } else {
+        return resources.get(0);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public ResourceSet getAll() {
+    return getAll(true);
+  }
+
+  /**
+   * Get all resource from the pool
+   * @param remote false only return local resource
+   * @return
+   */
+  public ResourceSet getAll(boolean remote) {
+    ResourceSet all = super.getAll();
+    if (remote) {
+      all.addAll(connector.getAllResources());
+    }
+    return all;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
new file mode 100644
index 0000000..cc5f7e9
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/LocalResourcePool.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.util.*;
+
+/**
+ * ResourcePool
+ */
+public class LocalResourcePool implements ResourcePool {
+  private final String resourcePoolId;
+  private final Map<ResourceId, Resource> resources = 
Collections.synchronizedMap(
+      new HashMap<ResourceId, Resource>());
+
+  /**
+   * @param id unique id
+   */
+  public LocalResourcePool(String id) {
+    resourcePoolId = id;
+  }
+
+  /**
+   * Get unique id of this resource pool
+   * @return
+   */
+  @Override
+  public String id() {
+    return resourcePoolId;
+  }
+
+  /**
+   * Get resource
+   * @return null if resource not found
+   */
+  @Override
+  public Resource get(String name) {
+    ResourceId resourceId = new ResourceId(resourcePoolId, name);
+    return resources.get(resourceId);
+  }
+
+  @Override
+  public ResourceSet getAll() {
+    return new ResourceSet(resources.values());
+  }
+
+  /**
+   * Put resource into the pull
+   * @param
+   * @param object object to put into the resource
+   */
+  @Override
+  public void put(String name, Object object) {
+    ResourceId resourceId = new ResourceId(resourcePoolId, name);
+
+    Resource resource = new Resource(resourceId, object);
+    resources.put(resourceId, resource);
+  }
+
+  @Override
+  public Resource remove(String name) {
+    return resources.remove(new ResourceId(resourcePoolId, name));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
new file mode 100644
index 0000000..5a8a9ea
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteResource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Resource that can retrieve data from remote
+ */
+public class RemoteResource extends Resource {
+  ResourcePoolConnector resourcePoolConnector;
+
+  RemoteResource(ResourceId resourceId, Object r) {
+    super(resourceId, r);
+  }
+
+  RemoteResource(ResourceId resourceId, boolean serializable, String 
className) {
+    super(resourceId, serializable, className);
+  }
+
+  @Override
+  public Object get() {
+    if (isSerializable()) {
+      Object o = resourcePoolConnector.readResource(getResourceId());
+      return o;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean isLocal() {
+    return false;
+  }
+
+  public ResourcePoolConnector getResourcePoolConnector() {
+    return resourcePoolConnector;
+  }
+
+  public void setResourcePoolConnector(ResourcePoolConnector 
resourcePoolConnector) {
+    this.resourcePoolConnector = resourcePoolConnector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
new file mode 100644
index 0000000..6988b3e
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/Resource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * Information and reference to the resource
+ */
+public class Resource {
+  private final transient Object r;
+  private final boolean serializable;
+  private final ResourceId resourceId;
+  private final String className;
+
+
+  /**
+   * Create local resource
+   * @param resourceId
+   * @param r must not be null
+   */
+  Resource(ResourceId resourceId, Object r) {
+    this.r = r;
+    this.resourceId = resourceId;
+    this.serializable = r instanceof Serializable;
+    this.className = r.getClass().getName();
+  }
+
+  /**
+   * Create remote object
+   * @param resourceId
+   */
+  Resource(ResourceId resourceId, boolean serializable, String className) {
+    this.r = null;
+    this.resourceId = resourceId;
+    this.serializable = serializable;
+    this.className = className;
+  }
+
+  public ResourceId getResourceId() {
+    return resourceId;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  /**
+   *
+   * @return null when this is remote resource and not serializable.
+   */
+  public Object get() {
+    if (isLocal() || isSerializable()){
+      return r;
+    } else {
+      return null;
+    }
+  }
+
+  public boolean isSerializable() {
+    return serializable;
+  }
+
+  /**
+   * if it is remote object
+   * @return
+   */
+  public boolean isRemote() {
+    return !isLocal();
+  }
+
+  /**
+   * Whether it is locally accessible or not
+   * @return
+   */
+  public boolean isLocal() {
+    return true;
+  }
+
+
+
+  public static ByteBuffer serializeObject(Object o) throws IOException {
+    if (o == null || !(o instanceof Serializable)) {
+      return null;
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos;
+      oos = new ObjectOutputStream(out);
+      oos.writeObject(o);
+      oos.close();
+      out.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return ByteBuffer.wrap(out.toByteArray());
+  }
+
+  public static Object deserializeObject(ByteBuffer buf)
+      throws IOException, ClassNotFoundException {
+    if (buf == null) {
+      return null;
+    }
+    InputStream ins = ByteBufferInputStream.get(buf);
+    ObjectInputStream oin;
+    Object object = null;
+
+    oin = new ObjectInputStream(ins);
+    object = oin.readObject();
+    oin.close();
+    ins.close();
+
+    return object;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
new file mode 100644
index 0000000..a0d55e3
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Identifying resource
+ */
+public class ResourceId {
+  private final String resourcePoolId;
+  private final String name;
+
+  ResourceId(String resourcePoolId, String name) {
+    this.resourcePoolId = resourcePoolId;
+    this.name = name;
+  }
+
+  public String getResourcePoolId() {
+    return resourcePoolId;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public int hashCode() {
+    return (resourcePoolId + name).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof ResourceId) {
+      ResourceId r = (ResourceId) o;
+      return (r.name.equals(name) && r.resourcePoolId.equals(resourcePoolId));
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
new file mode 100644
index 0000000..6328b8d
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePool.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Interface for ResourcePool
+ */
+public interface ResourcePool {
+  /**
+   * Get unique id of the resource pool
+   * @return
+   */
+  public String id();
+
+  /**
+   * Get resource from name
+   * @param name Resource name
+   * @return null if resource not found
+   */
+  public Resource get(String name);
+
+  /**
+   * Get all resources
+   * @return
+   */
+  public ResourceSet getAll();
+
+  /**
+   * Put an object into resource pool
+   * @param name
+   * @param object
+   */
+  public void put(String name, Object object);
+
+  /**
+   * Remove object
+   * @param name Resource name to remove
+   * @return removed Resource. null if resource not found
+   */
+  public Resource remove(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
new file mode 100644
index 0000000..af343db
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolConnector.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+/**
+ * Connect resource pools running in remote process
+ */
+public interface ResourcePoolConnector {
+  /**
+   * Get list of resources from all other resource pools in remote processes
+   * @return
+   */
+  public ResourceSet getAllResources();
+
+  /**
+   * Read remote object
+   * @return
+   */
+  public Object readResource(ResourceId id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
new file mode 100644
index 0000000..a03655b
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSet.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.regex.Pattern;
+
+/**
+ * List of resources
+ */
+public class ResourceSet extends LinkedList<Resource> {
+
+  public ResourceSet(Collection<Resource> resources) {
+    super(resources);
+  }
+
+  public ResourceSet() {
+    super();
+  }
+
+  public ResourceSet filterByNameRegex(String regex) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (Pattern.matches(regex, r.getResourceId().getName())) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByName(String name) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (r.getResourceId().getName().equals(name)) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByClassnameRegex(String regex) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (Pattern.matches(regex, r.getClassName())) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+
+  public ResourceSet filterByClassname(String className) {
+    ResourceSet result = new ResourceSet();
+    for (Resource r : this) {
+      if (r.getClassName().equals(className)) {
+        result.add(r);
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift 
b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 5cd14a2..3d6a62e 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -43,8 +43,10 @@ enum RemoteInterpreterEventType {
   ANGULAR_OBJECT_UPDATE = 3,
   ANGULAR_OBJECT_REMOVE = 4,
   RUN_INTERPRETER_CONTEXT_RUNNER = 5,
-  OUTPUT_APPEND = 6,
-  OUTPUT_UPDATE = 7
+  RESOURCE_POOL_GET_ALL = 6,
+  RESOURCE_GET = 7
+  OUTPUT_APPEND = 8,
+  OUTPUT_UPDATE = 9
 }
 
 struct RemoteInterpreterEvent {
@@ -53,7 +55,7 @@ struct RemoteInterpreterEvent {
 }
 
 service RemoteInterpreterService {
-  void createInterpreter(1: string className, 2: map<string, string> 
properties);
+  void createInterpreter(1: string intpGroupId, 2: string className, 3: 
map<string, string> properties);
 
   void open(1: string className);
   void close(1: string className);
@@ -67,8 +69,18 @@ service RemoteInterpreterService {
   string getStatus(1:string jobId);
 
   RemoteInterpreterEvent getEvent();
+
+  // as a response, ZeppelinServer send list of resources to Interpreter 
process
+  void resourcePoolResponseGetAll(1: list<string> resources);
+  // as a response, ZeppelinServer send serialized value of resource
+  void resourceResponseGet(1: string resourceId, 2: binary object);
+  // get all resources in the interpreter process
+  list<string> resoucePoolGetAll();
+  // get value of resource
+  binary resourceGet(1: string resourceName);
+
   void angularObjectUpdate(1: string name, 2: string noteId, 3: string 
paragraphId, 4: string
   object);
   void angularObjectAdd(1: string name, 2: string noteId, 3: string 
paragraphId, 4: string object);
   void angularObjectRemove(1: string name, 2: string noteId, 3: string 
paragraphId);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 9c2732d..40fd2ed 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -27,7 +27,7 @@ public class InterpreterContextTest {
   public void testThreadLocal() {
     assertNull(InterpreterContext.get());
 
-    InterpreterContext.set(new InterpreterContext(null, null, null, null, 
null, null, null, null, null));
+    InterpreterContext.set(new InterpreterContext(null, null, null, null, 
null, null, null, null, null, null));
     assertNotNull(InterpreterContext.get());
 
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
index 385b9d6..b6801e4 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java
@@ -34,6 +34,8 @@ import 
org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
+import org.apache.zeppelin.resource.LocalResourcePool;
+import org.apache.zeppelin.resource.ResourcePool;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -85,6 +87,7 @@ public class RemoteAngularObjectTest implements 
AngularObjectRegistryListener {
         new HashMap<String, Object>(),
         new GUI(),
         new AngularObjectRegistry(intpGroup.getId(), null),
+        new LocalResourcePool("pool1"),
         new LinkedList<InterpreterContextRunner>(), null);
 
     intp.open();
@@ -93,7 +96,7 @@ public class RemoteAngularObjectTest implements 
AngularObjectRegistryListener {
   @After
   public void tearDown() throws Exception {
     intp.close();
-    intpGroup.clone();
+    intpGroup.close();
     intpGroup.destroy();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
index f229f6b..7ebe597 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java
@@ -71,14 +71,15 @@ public class RemoteInterpreterOutputTestStream implements 
RemoteInterpreterProce
 
   private InterpreterContext createInterpreterContext() {
     return new InterpreterContext(
-            "noteId",
-            "id",
-            "title",
-            "text",
-            new HashMap<String, Object>(),
-            new GUI(),
-            new AngularObjectRegistry(intpGroup.getId(), null),
-            new LinkedList<InterpreterContextRunner>(), null);
+        "noteId",
+        "id",
+        "title",
+        "text",
+        new HashMap<String, Object>(),
+        new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        null,
+        new LinkedList<InterpreterContextRunner>(), null);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index 82ca8d4..4af9ba4 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
+import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.scheduler.Scheduler;
@@ -123,6 +124,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
 
     intpB.open();
@@ -156,6 +158,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
 
     assertEquals(Code.ERROR, ret.code());
@@ -204,6 +207,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("500", ret.message());
 
@@ -216,6 +220,7 @@ public class RemoteInterpreterTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
     assertEquals("1000", ret.message());
     long end = System.currentTimeMillis();
@@ -267,6 +272,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
                 new LinkedList<InterpreterContextRunner>(), null));
       }
 
@@ -301,6 +307,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
                 new LinkedList<InterpreterContextRunner>(), null));
       }
 
@@ -366,6 +373,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
               new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
@@ -443,6 +451,7 @@ public class RemoteInterpreterTest {
               new HashMap<String, Object>(),
               new GUI(),
               new AngularObjectRegistry(intpGroup.getId(), null),
+              new LocalResourcePool("pool1"),
               new LinkedList<InterpreterContextRunner>(), null));
 
           synchronized (results) {
@@ -541,6 +550,7 @@ public class RemoteInterpreterTest {
                 new HashMap<String, Object>(),
                 new GUI(),
                 new AngularObjectRegistry(intpGroup.getId(), null),
+                new LocalResourcePool("pool1"),
                 new LinkedList<InterpreterContextRunner>(), null));
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
new file mode 100644
index 0000000..1db68ad
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote.mock;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.resource.ResourcePool;
+
+public class MockInterpreterResourcePool extends Interpreter {
+  static {
+    Interpreter.register(
+        "resourcePoolTest",
+        "resourcePool",
+        MockInterpreterA.class.getName(),
+        new InterpreterPropertyBuilder()
+            .add("p1", "v1", "property1").build());
+
+  }
+
+  AtomicInteger numWatch = new AtomicInteger(0);
+
+  public MockInterpreterResourcePool(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    String[] stmt = st.split(" ");
+    String cmd = stmt[0];
+    String name = null;
+    if (stmt.length >= 2) {
+      name = stmt[1];
+    }
+    String value = null;
+    if (stmt.length == 3) {
+      value = stmt[2];
+    }
+
+    ResourcePool resourcePool = context.getResourcePool();
+    Object ret = null;
+    if (cmd.equals("put")) {
+      resourcePool.put(name, value);
+    } else if (cmd.equalsIgnoreCase("get")) {
+      ret = resourcePool.get(name).get();
+    } else if (cmd.equals("remove")) {
+      ret = resourcePool.remove(name);
+    } else if (cmd.equals("getAll")) {
+      ret = resourcePool.getAll();
+    }
+
+    try {
+      Thread.sleep(500); // wait for watcher executed
+    } catch (InterruptedException e) {
+    }
+
+    Gson gson = new Gson();
+    return new InterpreterResult(Code.SUCCESS, gson.toJson(ret));
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
new file mode 100644
index 0000000..bedaa02
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/DistributedResourcePoolTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import com.google.gson.Gson;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
+import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterResourcePool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unittest for DistributedResourcePool
+ */
+public class DistributedResourcePoolTest {
+  private InterpreterGroup intpGroup1;
+  private InterpreterGroup intpGroup2;
+  private HashMap<String, String> env;
+  private RemoteInterpreter intp1;
+  private RemoteInterpreter intp2;
+  private InterpreterContext context;
+  private RemoteInterpreterEventPoller eventPoller1;
+  private RemoteInterpreterEventPoller eventPoller2;
+
+
+  @Before
+  public void setUp() throws Exception {
+    env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+
+    Properties p = new Properties();
+
+    intp1 = new RemoteInterpreter(
+        p,
+        MockInterpreterResourcePool.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        "fakeRepo",
+        env,
+        10 * 1000,
+        null
+    );
+
+    intpGroup1 = new InterpreterGroup("intpGroup1");
+    intpGroup1.add(intp1);
+    intp1.setInterpreterGroup(intpGroup1);
+
+    intp2 = new RemoteInterpreter(
+        p,
+        MockInterpreterResourcePool.class.getName(),
+        new File("../bin/interpreter.sh").getAbsolutePath(),
+        "fake",
+        "fakeRepo",        
+        env,
+        10 * 1000,
+        null
+    );
+
+    intpGroup2 = new InterpreterGroup("intpGroup2");
+    intpGroup2.add(intp2);
+    intp2.setInterpreterGroup(intpGroup2);
+
+    context = new InterpreterContext(
+        "note",
+        "id",
+        "title",
+        "text",
+        new HashMap<String, Object>(),
+        new GUI(),
+        null,
+        null,
+        new LinkedList<InterpreterContextRunner>(),
+        null);
+
+    intp1.open();
+    intp2.open();
+
+    eventPoller1 = new RemoteInterpreterEventPoller(null);
+    eventPoller1.setInterpreterGroup(intpGroup1);
+    
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
+
+    eventPoller2 = new RemoteInterpreterEventPoller(null);
+    eventPoller2.setInterpreterGroup(intpGroup2);
+    
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
+
+    eventPoller1.start();
+    eventPoller2.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    eventPoller1.shutdown();
+    intp1.close();
+    intpGroup1.close();
+    intpGroup1.destroy();
+    eventPoller2.shutdown();
+    intp2.close();
+    intpGroup2.close();
+    intpGroup2.destroy();
+  }
+
+  @Test
+  public void testRemoteDistributedResourcePool() {
+    Gson gson = new Gson();
+    InterpreterResult ret;
+    intp1.interpret("put key1 value1", context);
+    intp2.interpret("put key2 value2", context);
+
+    ret = intp1.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
+
+    ret = intp2.interpret("getAll", context);
+    assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
+
+    ret = intp1.interpret("get key1", context);
+    assertEquals("value1", gson.fromJson(ret.message(), String.class));
+
+    ret = intp1.interpret("get key2", context);
+    assertEquals("value2", gson.fromJson(ret.message(), String.class));
+  }
+
+  @Test
+  public void testDistributedResourcePool() {
+    final LocalResourcePool pool2 = new LocalResourcePool("pool2");
+    final LocalResourcePool pool3 = new LocalResourcePool("pool3");
+
+    DistributedResourcePool pool1 = new DistributedResourcePool("pool1", new 
ResourcePoolConnector() {
+      @Override
+      public ResourceSet getAllResources() {
+        ResourceSet set = pool2.getAll();
+        set.addAll(pool3.getAll());
+
+        ResourceSet remoteSet = new ResourceSet();
+        Gson gson = new Gson();
+        for (Resource s : set) {
+          RemoteResource remoteResource = gson.fromJson(gson.toJson(s), 
RemoteResource.class);
+          remoteResource.setResourcePoolConnector(this);
+          remoteSet.add(remoteResource);
+        }
+        return remoteSet;
+      }
+
+      @Override
+      public Object readResource(ResourceId id) {
+        if (id.getResourcePoolId().equals(pool2.id())) {
+          return pool2.get(id.getName()).get();
+        }
+        if (id.getResourcePoolId().equals(pool3.id())) {
+          return pool3.get(id.getName()).get();
+        }
+        return null;
+      }
+    });
+
+    assertEquals(0, pool1.getAll().size());
+
+
+    // test get() can get from pool
+    pool2.put("object1", "value2");
+    assertEquals(1, pool1.getAll().size());
+    assertTrue(pool1.get("object1").isRemote());
+    assertEquals("value2", pool1.get("object1").get());
+
+    // test get() is locality aware
+    pool1.put("object1", "value1");
+    assertEquals(1, pool2.getAll().size());
+    assertEquals("value1", pool1.get("object1").get());
+
+    // test getAll() is locality aware
+    assertEquals("value1", pool1.getAll().get(0).get());
+    assertEquals("value2", pool1.getAll().get(1).get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
new file mode 100644
index 0000000..65d284b
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/LocalResourcePoolTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unittest for LocalResourcePool
+ */
+public class LocalResourcePoolTest {
+
+  @Test
+  public void testGetPutResourcePool() {
+
+    LocalResourcePool pool = new LocalResourcePool("pool1");
+    assertEquals("pool1", pool.id());
+
+    assertNull(pool.get("notExists"));
+    pool.put("item1", "value1");
+    Resource resource = pool.get("item1");
+    assertNotNull(resource);
+    assertEquals(pool.id(), resource.getResourceId().getResourcePoolId());
+    assertEquals("value1", resource.get());
+    assertTrue(resource.isLocal());
+    assertTrue(resource.isSerializable());
+
+    assertEquals(1, pool.getAll().size());
+
+    assertNotNull(pool.remove("item1"));
+    assertNull(pool.remove("item1"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
new file mode 100644
index 0000000..ca64525
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceSetTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for ResourceSet
+ */
+public class ResourceSetTest {
+
+  @Test
+  public void testFilterByName() {
+    ResourceSet set = new ResourceSet();
+
+    set.add(new Resource(new ResourceId("poo1", "resource1"), "value1"));
+    set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2)));
+    assertEquals(2, set.filterByNameRegex(".*").size());
+    assertEquals(1, set.filterByNameRegex("resource1").size());
+    assertEquals(1, set.filterByNameRegex("resource2").size());
+    assertEquals(0, set.filterByNameRegex("res").size());
+    assertEquals(2, set.filterByNameRegex("res.*").size());
+  }
+
+  @Test
+  public void testFilterByClassName() {
+    ResourceSet set = new ResourceSet();
+
+    set.add(new Resource(new ResourceId("poo1", "resource1"), "value1"));
+    set.add(new Resource(new ResourceId("poo1", "resource2"), new Integer(2)));
+
+    assertEquals(1, set.filterByClassnameRegex(".*String").size());
+    assertEquals(1, set.filterByClassnameRegex(String.class.getName()).size());
+    assertEquals(1, set.filterByClassnameRegex(".*Integer").size());
+    assertEquals(1, 
set.filterByClassnameRegex(Integer.class.getName()).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
new file mode 100644
index 0000000..fb8b271
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/resource/ResourceTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.resource;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for Resource
+ */
+public class ResourceTest {
+  @Test
+  public void testSerializeDeserialize() throws IOException, 
ClassNotFoundException {
+    ByteBuffer buffer = Resource.serializeObject("hello");
+    assertEquals("hello", Resource.deserializeObject(buffer));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
index d46b8cf..2bdcd4f 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java
@@ -34,13 +34,15 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
+import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class RemoteSchedulerTest {
+public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
 
   private SchedulerFactory schedulerSvc;
   private static final int TICK_WAIT = 100;
@@ -71,7 +73,7 @@ public class RemoteSchedulerTest {
         "fakeRepo",
         env,
         10 * 1000,
-        null);
+        this);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -104,6 +106,7 @@ public class RemoteSchedulerTest {
             new HashMap<String, Object>(),
             new GUI(),
             new AngularObjectRegistry(intpGroup.getId(), null),
+            new LocalResourcePool("pool1"),
             new LinkedList<InterpreterContextRunner>(), null));
         return "1000";
       }
@@ -155,7 +158,7 @@ public class RemoteSchedulerTest {
         "fakeRepo",
         env,
         10 * 1000,
-        null);
+        this);
 
     intpGroup.add(intpA);
     intpA.setInterpreterGroup(intpGroup);
@@ -175,6 +178,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
           new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
@@ -211,6 +215,7 @@ public class RemoteSchedulerTest {
           new HashMap<String, Object>(),
           new GUI(),
           new AngularObjectRegistry(intpGroup.getId(), null),
+          new LocalResourcePool("pool1"),
           new LinkedList<InterpreterContextRunner>(), null);
 
       @Override
@@ -270,4 +275,13 @@ public class RemoteSchedulerTest {
     schedulerSvc.removeScheduler("test");
   }
 
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String output) 
{
+
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String 
output) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 7a87c92..df80cd0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -493,9 +493,12 @@ public class Notebook {
       
       boolean releaseResource = false;
       try {
-        releaseResource = (boolean) note.getConfig().get("releaseresource");
-      } catch (java.lang.ClassCastException e) {
-        logger.error(e.toString(), e);
+        Map<String, Object> config = note.getConfig();
+        if (config != null && config.containsKey("releaseresource")) {
+          releaseResource = (boolean) note.getConfig().get("releaseresource");
+        }
+      } catch (ClassCastException e) {
+        logger.error(e.getMessage(), e);
       }
       if (releaseResource) {
         for (InterpreterSetting setting : 
note.getNoteReplLoader().getInterpreterSettings()) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 65210f5..bf17c35 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -23,6 +23,7 @@ import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.Interpreter.FormType;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.JobListener;
 import org.slf4j.Logger;
@@ -256,10 +257,12 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
 
   private InterpreterContext getInterpreterContext() {
     AngularObjectRegistry registry = null;
+    ResourcePool resourcePool = null;
 
     if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
       InterpreterSetting intpGroup = 
getNoteReplLoader().getInterpreterSettings().get(0);
       registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
+      resourcePool = intpGroup.getInterpreterGroup().getResourcePool();
     }
 
     List<InterpreterContextRunner> runners = new 
LinkedList<InterpreterContextRunner>();
@@ -276,6 +279,7 @@ public class Paragraph extends Job implements Serializable, 
Cloneable {
             this.getConfig(),
             this.settings,
             registry,
+            resourcePool,
             runners,
             new InterpreterOutput(new InterpreterOutputListener() {
               @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/ddf2c89e/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
index 8fea693..d9e965e 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterFactoryTest.java
@@ -62,7 +62,7 @@ public class InterpreterFactoryTest {
     conf = new ZeppelinConfiguration();
     depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + 
"/local-repo");
     factory = new InterpreterFactory(conf, new InterpreterOption(false), null, 
null, depResolver);
-    context = new InterpreterContext("note", "id", "title", "text", null, 
null, null, null, null);
+    context = new InterpreterContext("note", "id", "title", "text", null, 
null, null, null, null, null);
 
   }
 

Reply via email to