http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
index d0025d8..191902a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java
@@ -41,6 +41,7 @@ public abstract class Job {
   /**
    * Job status.
    *
+   * UNKNOWN - Job is not found in remote
    * READY - Job is not running, ready to run.
    * PENDING - Job is submitted to scheduler. but not running yet
    * RUNNING - Job is running.
@@ -48,8 +49,8 @@ public abstract class Job {
    * ERROR - Job finished run. with error
    * ABORT - Job finished by abort
    */
-  public static enum Status {
-    READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
+  public enum Status {
+    UNKNOWN, READY, PENDING, RUNNING, FINISHED, ERROR, ABORT;
 
     public boolean isReady() {
       return this == READY;
@@ -70,14 +71,14 @@ public abstract class Job {
   Date dateCreated;
   Date dateStarted;
   Date dateFinished;
-  Status status;
+  volatile Status status;
 
   static Logger LOGGER = LoggerFactory.getLogger(Job.class);
 
   transient boolean aborted = false;
 
-  private String errorMessage;
-  private transient Throwable exception;
+  private volatile String errorMessage;
+  private transient volatile Throwable exception;
   private transient JobListener listener;
   private long progressUpdateIntervalMs;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
deleted file mode 100644
index f9ddc4e..0000000
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.scheduler;
-
-import org.apache.thrift.TException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
-import org.apache.zeppelin.scheduler.Job.Status;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-/**
- * RemoteScheduler runs in ZeppelinServer and proxies Scheduler running on 
RemoteInterpreter
- */
-public class RemoteScheduler implements Scheduler {
-  Logger logger = LoggerFactory.getLogger(RemoteScheduler.class);
-
-  List<Job> queue = new LinkedList<>();
-  List<Job> running = new LinkedList<>();
-  private ExecutorService executor;
-  private SchedulerListener listener;
-  boolean terminate = false;
-  private String name;
-  private int maxConcurrency;
-  private final String noteId;
-  private RemoteInterpreterProcess interpreterProcess;
-
-  public RemoteScheduler(String name, ExecutorService executor, String noteId,
-                         RemoteInterpreterProcess interpreterProcess, 
SchedulerListener listener,
-                         int maxConcurrency) {
-    this.name = name;
-    this.executor = executor;
-    this.listener = listener;
-    this.noteId = noteId;
-    this.interpreterProcess = interpreterProcess;
-    this.maxConcurrency = maxConcurrency;
-  }
-
-  @Override
-  public void run() {
-    while (terminate == false) {
-      Job job = null;
-
-      synchronized (queue) {
-        if (running.size() >= maxConcurrency || queue.isEmpty() == true) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while run queue.wait", 
e);
-          }
-          continue;
-        }
-
-        job = queue.remove(0);
-        running.add(job);
-      }
-
-      // run
-      Scheduler scheduler = this;
-      JobRunner jobRunner = new JobRunner(scheduler, job);
-      executor.execute(jobRunner);
-
-      // wait until it is submitted to the remote
-      while (!jobRunner.isJobSubmittedInRemote()) {
-        synchronized (queue) {
-          try {
-            queue.wait(500);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while 
jobRunner.isJobSubmittedInRemote " +
-                "queue.wait", e);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public Collection<Job> getJobsWaiting() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : queue) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public Job removeFromWaitingQueue(String jobId) {
-    synchronized (queue) {
-      Iterator<Job> it = queue.iterator();
-      while (it.hasNext()) {
-        Job job = it.next();
-        if (job.getId().equals(jobId)) {
-          it.remove();
-          return job;
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public Collection<Job> getJobsRunning() {
-    List<Job> ret = new LinkedList<>();
-    synchronized (queue) {
-      for (Job job : running) {
-        ret.add(job);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  public void submit(Job job) {
-    if (terminate) {
-      throw new RuntimeException("Scheduler already terminated");
-    }
-    job.setStatus(Status.PENDING);
-
-    synchronized (queue) {
-      queue.add(job);
-      queue.notify();
-    }
-  }
-
-  public void setMaxConcurrency(int maxConcurrency) {
-    this.maxConcurrency = maxConcurrency;
-    synchronized (queue) {
-      queue.notify();
-    }
-  }
-
-  /**
-   * Role of the class is get status info from remote process from PENDING to
-   * RUNNING status.
-   */
-  private class JobStatusPoller extends Thread {
-    private long initialPeriodMsec;
-    private long initialPeriodCheckIntervalMsec;
-    private long checkIntervalMsec;
-    private boolean terminate;
-    private JobListener listener;
-    private Job job;
-    Status lastStatus;
-
-    public JobStatusPoller(long initialPeriodMsec,
-        long initialPeriodCheckIntervalMsec, long checkIntervalMsec, Job job,
-        JobListener listener) {
-      this.initialPeriodMsec = initialPeriodMsec;
-      this.initialPeriodCheckIntervalMsec = initialPeriodCheckIntervalMsec;
-      this.checkIntervalMsec = checkIntervalMsec;
-      this.job = job;
-      this.listener = listener;
-      this.terminate = false;
-    }
-
-    @Override
-    public void run() {
-      long started = System.currentTimeMillis();
-      while (terminate == false) {
-        long current = System.currentTimeMillis();
-        long interval;
-        if (current - started < initialPeriodMsec) {
-          interval = initialPeriodCheckIntervalMsec;
-        } else {
-          interval = checkIntervalMsec;
-        }
-
-        synchronized (this) {
-          try {
-            this.wait(interval);
-          } catch (InterruptedException e) {
-            logger.error("Exception in RemoteScheduler while run this.wait", 
e);
-          }
-        }
-
-        if (terminate) {
-          // terminated by shutdown
-          break;
-        }
-
-        Status newStatus = getStatus();
-        if (newStatus == null) { // unknown
-          continue;
-        }
-
-        if (newStatus != Status.READY && newStatus != Status.PENDING) {
-          // we don't need more
-          break;
-        }
-      }
-      terminate = true;
-    }
-
-    public void shutdown() {
-      terminate = true;
-      synchronized (this) {
-        this.notify();
-      }
-    }
-
-
-    private Status getLastStatus() {
-      if (terminate == true) {
-        if (lastStatus != Status.FINISHED &&
-            lastStatus != Status.ERROR &&
-            lastStatus != Status.ABORT) {
-          return Status.FINISHED;
-        } else {
-          return (lastStatus == null) ? Status.FINISHED : lastStatus;
-        }
-      } else {
-        return (lastStatus == null) ? Status.FINISHED : lastStatus;
-      }
-    }
-
-    public synchronized Job.Status getStatus() {
-      if (interpreterProcess.referenceCount() <= 0) {
-        return getLastStatus();
-      }
-
-      Client client;
-      try {
-        client = interpreterProcess.getClient();
-      } catch (Exception e) {
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      }
-
-      boolean broken = false;
-      try {
-        String statusStr = client.getStatus(noteId, job.getId());
-        if ("Unknown".equals(statusStr)) {
-          // not found this job in the remote schedulers.
-          // maybe not submitted, maybe already finished
-          //Status status = getLastStatus();
-          listener.afterStatusChange(job, null, null);
-          return job.getStatus();
-        }
-        Status status = Status.valueOf(statusStr);
-        lastStatus = status;
-        listener.afterStatusChange(job, null, status);
-        return status;
-      } catch (TException e) {
-        broken = true;
-        logger.error("Can't get status information", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } catch (Exception e) {
-        logger.error("Unknown status", e);
-        lastStatus = Status.ERROR;
-        return Status.ERROR;
-      } finally {
-        interpreterProcess.releaseClient(client, broken);
-      }
-    }
-  }
-
-  private class JobRunner implements Runnable, JobListener {
-    private Scheduler scheduler;
-    private Job job;
-    private boolean jobExecuted;
-    boolean jobSubmittedRemotely;
-
-    public JobRunner(Scheduler scheduler, Job job) {
-      this.scheduler = scheduler;
-      this.job = job;
-      jobExecuted = false;
-      jobSubmittedRemotely = false;
-    }
-
-    public boolean isJobSubmittedInRemote() {
-      return jobSubmittedRemotely;
-    }
-
-    @Override
-    public void run() {
-      if (job.isAborted()) {
-        synchronized (queue) {
-          job.setStatus(Status.ABORT);
-          job.aborted = false;
-
-          running.remove(job);
-          queue.notify();
-        }
-        jobSubmittedRemotely = true;
-
-        return;
-      }
-
-      JobStatusPoller jobStatusPoller = new JobStatusPoller(1500, 100, 500,
-          job, this);
-      jobStatusPoller.start();
-
-      if (listener != null) {
-        listener.jobStarted(scheduler, job);
-      }
-      job.run();
-
-      jobExecuted = true;
-      jobSubmittedRemotely = true;
-
-      jobStatusPoller.shutdown();
-      try {
-        jobStatusPoller.join();
-      } catch (InterruptedException e) {
-        logger.error("JobStatusPoller interrupted", e);
-      }
-
-      // set job status based on result.
-      Status lastStatus = jobStatusPoller.getStatus();
-      Object jobResult = job.getReturn();
-      if (jobResult != null && jobResult instanceof InterpreterResult) {
-        if (((InterpreterResult) jobResult).code() == Code.ERROR) {
-          lastStatus = Status.ERROR;
-        }
-      }
-      if (job.getException() != null) {
-        lastStatus = Status.ERROR;
-      }
-
-      synchronized (queue) {
-        job.setStatus(lastStatus);
-
-        if (listener != null) {
-          listener.jobFinished(scheduler, job);
-        }
-
-        // reset aborted flag to allow retry
-        job.aborted = false;
-
-        running.remove(job);
-        queue.notify();
-      }
-    }
-
-    @Override
-    public void onProgressUpdate(Job job, int progress) {
-    }
-
-    @Override
-    public void beforeStatusChange(Job job, Status before, Status after) {
-    }
-
-    @Override
-    public void afterStatusChange(Job job, Status before, Status after) {
-      if (after == null) { // unknown. maybe before sumitted remotely, maybe 
already finished.
-        if (jobExecuted) {
-          jobSubmittedRemotely = true;
-          Object jobResult = job.getReturn();
-          if (job.isAborted()) {
-            job.setStatus(Status.ABORT);
-          } else if (job.getException() != null) {
-            job.setStatus(Status.ERROR);
-          } else if (jobResult != null && jobResult instanceof 
InterpreterResult
-              && ((InterpreterResult) jobResult).code() == Code.ERROR) {
-            job.setStatus(Status.ERROR);
-          } else {
-            job.setStatus(Status.FINISHED);
-          }
-        }
-        return;
-      }
-
-
-      // Update remoteStatus
-      if (jobExecuted == false) {
-        if (after == Status.FINISHED || after == Status.ABORT
-            || after == Status.ERROR) {
-          // it can be status of last run.
-          // so not updating the remoteStatus
-          return;
-        } else if (after == Status.RUNNING) {
-          jobSubmittedRemotely = true;
-        }
-      } else {
-        jobSubmittedRemotely = true;
-      }
-
-      // status polled by status poller
-      if (job.getStatus() != after) {
-        job.setStatus(after);
-      }
-    }
-  }
-
-  @Override
-  public void stop() {
-    terminate = true;
-    synchronized (queue) {
-      queue.notify();
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index af52dec..b629ef7 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -17,24 +17,21 @@
 
 package org.apache.zeppelin.scheduler;
 
-import java.util.Collection;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * TODO(moon) : add description.
+ * Factory class for creating schedulers
+ *
  */
 public class SchedulerFactory implements SchedulerListener {
   private static final Logger logger = 
LoggerFactory.getLogger(SchedulerFactory.class);
-  ExecutorService executor;
-  Map<String, Scheduler> schedulers = new LinkedHashMap<>();
+  protected ExecutorService executor;
+  protected Map<String, Scheduler> schedulers = new LinkedHashMap<>();
 
   private static SchedulerFactory singleton;
   private static Long singletonLock = new Long(0);
@@ -54,17 +51,17 @@ public class SchedulerFactory implements SchedulerListener {
     return singleton;
   }
 
-  public SchedulerFactory() throws Exception {
-    executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 
100);
+  SchedulerFactory() throws Exception {
+    executor = ExecutorFactory.singleton().createOrGet("SchedulerFactory", 
100);
   }
 
   public void destroy() {
-    ExecutorFactory.singleton().shutdown("schedulerFactory");
+    ExecutorFactory.singleton().shutdown("SchedulerFactory");
   }
 
   public Scheduler createOrGetFIFOScheduler(String name) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new FIFOScheduler(name, executor, this);
         schedulers.put(name, s);
         executor.execute(s);
@@ -75,7 +72,7 @@ public class SchedulerFactory implements SchedulerListener {
 
   public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
+      if (!schedulers.containsKey(name)) {
         Scheduler s = new ParallelScheduler(name, executor, this, 
maxConcurrency);
         schedulers.put(name, s);
         executor.execute(s);
@@ -84,60 +81,38 @@ public class SchedulerFactory implements SchedulerListener {
     }
   }
 
-  public Scheduler createOrGetRemoteScheduler(
-      String name,
-      String noteId,
-      RemoteInterpreterProcess interpreterProcess,
-      int maxConcurrency) {
-
+  public Scheduler createOrGetScheduler(Scheduler scheduler) {
     synchronized (schedulers) {
-      if (schedulers.containsKey(name) == false) {
-        Scheduler s = new RemoteScheduler(
-            name,
-            executor,
-            noteId,
-            interpreterProcess,
-            this,
-            maxConcurrency);
-        schedulers.put(name, s);
-        executor.execute(s);
+      if (!schedulers.containsKey(scheduler.getName())) {
+        schedulers.put(scheduler.getName(), scheduler);
+        executor.execute(scheduler);
       }
-      return schedulers.get(name);
+      return schedulers.get(scheduler.getName());
     }
   }
 
-  public Scheduler removeScheduler(String name) {
+  public void removeScheduler(String name) {
     synchronized (schedulers) {
       Scheduler s = schedulers.remove(name);
       if (s != null) {
         s.stop();
       }
     }
-    return null;
   }
 
-  public Collection<Scheduler> listScheduler(String name) {
-    List<Scheduler> s = new LinkedList<>();
-    synchronized (schedulers) {
-      for (Scheduler ss : schedulers.values()) {
-        s.add(ss);
-      }
-    }
-    return s;
+  public ExecutorService getExecutor() {
+    return executor;
   }
 
   @Override
   public void jobStarted(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getJobName() + " started by scheduler " + 
scheduler.getName());
+    logger.info("Job " + job.getId() + " started by scheduler " + 
scheduler.getName());
 
   }
 
   @Override
   public void jobFinished(Scheduler scheduler, Job job) {
-    logger.info("Job " + job.getJobName() + " finished by scheduler " + 
scheduler.getName());
+    logger.info("Job " + job.getId() + " finished by scheduler " + 
scheduler.getName());
 
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
index 8673476..1926528 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataProxy.java
@@ -17,7 +17,6 @@
 package org.apache.zeppelin.tabledata;
 
 import org.apache.zeppelin.resource.Resource;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
new file mode 100644
index 0000000..14c03a1
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/IdHashes.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Generate Tiny ID.
+ */
+public class IdHashes {
+  private static final char[] DICTIONARY = new char[] {'1', '2', '3', '4', 
'5', '6', '7', '8', '9',
+    'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'P', 'Q', 'R', 
'S', 'T', 'U', 'V',
+    'W', 'X', 'Y', 'Z'};
+
+  /**
+   * encodes the given string into the base of the dictionary provided in the 
constructor.
+   *
+   * @param value the number to encode.
+   * @return the encoded string.
+   */
+  private static String encode(Long value) {
+
+    List<Character> result = new ArrayList<>();
+    BigInteger base = new BigInteger("" + DICTIONARY.length);
+    int exponent = 1;
+    BigInteger remaining = new BigInteger(value.toString());
+    while (true) {
+      BigInteger a = base.pow(exponent); // 16^1 = 16
+      BigInteger b = remaining.mod(a); // 119 % 16 = 7 | 112 % 256 = 112
+      BigInteger c = base.pow(exponent - 1);
+      BigInteger d = b.divide(c);
+
+      // if d > dictionary.length, we have a problem. but BigInteger doesnt 
have
+      // a greater than method :-( hope for the best. theoretically, d is 
always
+      // an index of the dictionary!
+      result.add(DICTIONARY[d.intValue()]);
+      remaining = remaining.subtract(b); // 119 - 7 = 112 | 112 - 112 = 0
+
+      // finished?
+      if (remaining.equals(BigInteger.ZERO)) {
+        break;
+      }
+
+      exponent++;
+    }
+
+    // need to reverse it, since the start of the list contains the least 
significant values
+    StringBuffer sb = new StringBuffer();
+    for (int i = result.size() - 1; i >= 0; i--) {
+      sb.append(result.get(i));
+    }
+    return sb.toString();
+  }
+
+  public static String generateId() {
+    return encode(System.currentTimeMillis() + new Random().nextInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
new file mode 100644
index 0000000..6153f49
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/Util.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.util;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * TODO(moon) : add description.
+ */
+public class Util {
+  private static final String PROJECT_PROPERTIES_VERSION_KEY = "version";
+  private static final String GIT_PROPERTIES_COMMIT_ID_KEY = 
"git.commit.id.abbrev";
+  private static final String GIT_PROPERTIES_COMMIT_TS_KEY = "git.commit.time";
+
+  private static Properties projectProperties;
+  private static Properties gitProperties;
+
+  static {
+    projectProperties = new Properties();
+    gitProperties = new Properties();
+    try {
+      
projectProperties.load(Util.class.getResourceAsStream("/project.properties"));
+      gitProperties.load(Util.class.getResourceAsStream("/git.properties"));
+    } catch (IOException e) {
+      //Fail to read project.properties
+    }
+  }
+
+  /**
+   * Get Zeppelin version
+   *
+   * @return Current Zeppelin version
+   */
+  public static String getVersion() {
+    return 
StringUtils.defaultIfEmpty(projectProperties.getProperty(PROJECT_PROPERTIES_VERSION_KEY),
+            StringUtils.EMPTY);
+  }
+
+  /**
+   * Get Zeppelin Git latest commit id
+   *
+   * @return Latest Zeppelin commit id
+   */
+  public static String getGitCommitId() {
+    return 
StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_ID_KEY),
+            StringUtils.EMPTY);
+  }
+
+  /**
+   * Get Zeppelin Git latest commit timestamp
+   *
+   * @return Latest Zeppelin commit timestamp
+   */
+  public static String getGitTimestamp() {
+    return 
StringUtils.defaultIfEmpty(gitProperties.getProperty(GIT_PROPERTIES_COMMIT_TS_KEY),
+            StringUtils.EMPTY);
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
deleted file mode 100644
index a7a6eb9..0000000
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/DummyInterpreter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.zeppelin.interpreter;
-
-import java.util.Properties;
-
-/**
- *
- */
-public class DummyInterpreter extends Interpreter {
-
-  public DummyInterpreter(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    return null;
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-
-  }
-
-  @Override
-  public FormType getFormType() {
-    return null;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
index e376809..f3a30fb 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputChangeWatcherTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.After;
 import org.junit.Before;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class InterpreterOutputChangeWatcherTest implements 
InterpreterOutputChangeListener {
   private File tmpDir;
   private File fileChanged;
-  private int numChanged;
+  private AtomicInteger numChanged;
   private InterpreterOutputChangeWatcher watcher;
 
   @Before
@@ -40,7 +41,7 @@ public class InterpreterOutputChangeWatcherTest implements 
InterpreterOutputChan
     tmpDir = new 
File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
     tmpDir.mkdirs();
     fileChanged = null;
-    numChanged = 0;
+    numChanged = new AtomicInteger(0);
   }
 
   @After
@@ -66,7 +67,7 @@ public class InterpreterOutputChangeWatcherTest implements 
InterpreterOutputChan
   @Test
   public void test() throws IOException, InterruptedException {
     assertNull(fileChanged);
-    assertEquals(0, numChanged);
+    assertEquals(0, numChanged.get());
 
     Thread.sleep(1000);
     // create new file
@@ -92,14 +93,14 @@ public class InterpreterOutputChangeWatcherTest implements 
InterpreterOutputChan
     }
 
     assertNotNull(fileChanged);
-    assertEquals(1, numChanged);
+    assertEquals(1, numChanged.get());
   }
 
 
   @Override
   public void fileChanged(File file) {
     fileChanged = file;
-    numChanged++;
+    numChanged.incrementAndGet();
 
     synchronized(this) {
       notify();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index 305268c..31c9225 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+//TODO(zjffdu) add more test for Interpreter which is a very important class
 public class InterpreterTest {
 
   @Test
@@ -85,4 +86,41 @@ public class InterpreterTest {
     );
   }
 
+  public static class DummyInterpreter extends Interpreter {
+
+    public DummyInterpreter(Properties property) {
+      super(property);
+    }
+
+    @Override
+    public void open() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public InterpreterResult interpret(String st, InterpreterContext context) {
+      return null;
+    }
+
+    @Override
+    public void cancel(InterpreterContext context) {
+
+    }
+
+    @Override
+    public FormType getFormType() {
+      return null;
+    }
+
+    @Override
+    public int getProgress(InterpreterContext context) {
+      return 0;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
new file mode 100644
index 0000000..5f7426a
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+public class RemoteInterpreterUtilsTest {
+
+  @Test
+  public void testFindRandomAvailablePortOnAllLocalInterfaces() throws 
IOException {
+    
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() 
> 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/conf/interpreter.json
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/conf/interpreter.json 
b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
new file mode 100644
index 0000000..45e1d60
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/conf/interpreter.json
@@ -0,0 +1,115 @@
+{
+  "interpreterSettings": {
+    "2C3RWCVAG": {
+      "id": "2C3RWCVAG",
+      "name": "test",
+      "group": "test",
+      "properties": {
+        "property_1": "value_1",
+        "property_2": "new_value_2",
+        "property_3": "value_3"
+      },
+      "status": "READY",
+      "interpreterGroup": [
+        {
+          "name": "echo",
+          "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+          "defaultInterpreter": true,
+          "editor": {
+            "language": "java",
+            "editOnDblClick": false
+          }
+        }
+      ],
+      "dependencies": [],
+      "option": {
+        "remote": true,
+        "port": -1,
+        "perNote": "shared",
+        "perUser": "shared",
+        "isExistingProcess": false,
+        "setPermission": false,
+        "users": [],
+        "isUserImpersonate": false
+      }
+    },
+
+    "2CKWE7B19": {
+      "id": "2CKWE7B19",
+      "name": "test2",
+      "group": "test",
+      "properties": {
+        "property_1": "value_1",
+        "property_2": "new_value_2",
+        "property_3": "value_3"
+      },
+      "status": "READY",
+      "interpreterGroup": [
+        {
+          "name": "echo",
+          "class": "org.apache.zeppelin.interpreter.EchoInterpreter",
+          "defaultInterpreter": true,
+          "editor": {
+            "language": "java",
+            "editOnDblClick": false
+          }
+        }
+      ],
+      "dependencies": [],
+      "option": {
+        "remote": true,
+        "port": -1,
+        "perNote": "shared",
+        "perUser": "shared",
+        "isExistingProcess": false,
+        "setPermission": false,
+        "users": [],
+        "isUserImpersonate": false
+      }
+    }
+  },
+  "interpreterBindings": {
+    "2C6793KRV": [
+      "2C48Y7FSJ",
+      "2C63XW4XE",
+      "2C66GE1VB",
+      "2C5VH924X",
+      "2C4BJDRRZ",
+      "2C3SQSB7V",
+      "2C4HKDCQW",
+      "2C3DR183X",
+      "2C66Z9XPQ",
+      "2C3PTPMUH",
+      "2C69WE69N",
+      "2C5SRRXHM",
+      "2C4ZD49PF",
+      "2C6V3D44K",
+      "2C4UB1UZA",
+      "2C5S1R21W",
+      "2C5DCRVGM",
+      "2C686X8ZH",
+      "2C3RWCVAG",
+      "2C3JKFMJU",
+      "2C3VECEG2"
+    ]
+  },
+  "interpreterRepositories": [
+    {
+      "id": "central",
+      "type": "default",
+      "url": "http://repo1.maven.org/maven2/";,
+      "releasePolicy": {
+        "enabled": true,
+        "updatePolicy": "daily",
+        "checksumPolicy": "warn"
+      },
+      "snapshotPolicy": {
+        "enabled": true,
+        "updatePolicy": "daily",
+        "checksumPolicy": "warn"
+      },
+      "mirroredRepositories": [],
+      "repositoryManager": false
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
 
b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
new file mode 100644
index 0000000..1ba1b94
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/resources/interpreter/test/interpreter-setting.json
@@ -0,0 +1,42 @@
+[
+  {
+    "group": "test",
+    "name": "double_echo",
+    "className": "org.apache.zeppelin.interpreter.DoubleEchoInterpreter",
+    "properties": {
+      "property_1": {
+        "envName": "PROPERTY_1",
+        "propertyName": "property_1",
+        "defaultValue": "value_1",
+        "description": "desc_1"
+      },
+      "property_2": {
+        "envName": "PROPERTY_2",
+        "propertyName": "property_2",
+        "defaultValue": "value_2",
+        "description": "desc_2"
+      }
+    }
+  },
+
+  {
+    "group": "test",
+    "name": "echo",
+    "defaultInterpreter": true,
+    "className": "org.apache.zeppelin.interpreter.EchoInterpreter",
+    "properties": {
+      "property_1": {
+        "envName": "PROPERTY_1",
+        "propertyName": "property_1",
+        "defaultValue": "value_1",
+        "description": "desc_1"
+      },
+      "property_2": {
+        "envName": "PROPERTY_2",
+        "propertyName": "property_2",
+        "defaultValue": "value_2",
+        "description": "desc_2"
+      }
+    }
+  }
+]

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-interpreter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/resources/log4j.properties 
b/zeppelin-interpreter/src/test/resources/log4j.properties
index d8a7839..6f34691 100644
--- a/zeppelin-interpreter/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter/src/test/resources/log4j.properties
@@ -26,4 +26,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} 
%5p %c:%L - %m%n
 #
 
 # Root logger option
-log4j.rootLogger=INFO, stdout
\ No newline at end of file
+log4j.rootLogger=INFO, stdout
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.scheduler=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index cd0210e..c1dba5c 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -185,7 +185,7 @@ public class InterpreterRestApi {
 
       String noteId = request == null ? null : request.getNoteId();
       if (null == noteId) {
-        interpreterSettingManager.close(setting);
+        interpreterSettingManager.close(settingId);
       } else {
         interpreterSettingManager.restart(settingId, noteId, 
SecurityUtils.getPrincipal());
       }
@@ -208,7 +208,7 @@ public class InterpreterRestApi {
   @GET
   @ZeppelinApi
   public Response listInterpreter(String message) {
-    Map<String, InterpreterSetting> m = 
interpreterSettingManager.getAvailableInterpreterSettings();
+    Map<String, InterpreterSetting> m = 
interpreterSettingManager.getInterpreterSettingTemplates();
     return new JsonResponse<>(Status.OK, "", m).build();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 7453470..c103eeb 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -31,12 +31,10 @@ import org.apache.shiro.web.env.EnvironmentLoaderListener;
 import org.apache.shiro.web.servlet.ShiroFilter;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
-import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.helium.Helium;
 import org.apache.zeppelin.helium.HeliumApplicationFactory;
 import org.apache.zeppelin.helium.HeliumBundleFactory;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
-import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.notebook.Notebook;
@@ -93,13 +91,11 @@ public class ZeppelinServer extends Application {
   private NotebookRepoSync notebookRepo;
   private NotebookAuthorization notebookAuthorization;
   private Credentials credentials;
-  private DependencyResolver depResolver;
 
   public ZeppelinServer() throws Exception {
     ZeppelinConfiguration conf = ZeppelinConfiguration.create();
 
-    this.depResolver = new DependencyResolver(
-        conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
+
 
     InterpreterOutput.limit = 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
 
@@ -129,13 +125,26 @@ public class ZeppelinServer extends Application {
           new File(conf.getRelativeDir("zeppelin-web/src/app/spell")));
     }
 
+    this.schedulerFactory = SchedulerFactory.singleton();
+    this.interpreterSettingManager = new InterpreterSettingManager(conf, 
notebookWsServer,
+        notebookWsServer, notebookWsServer);
+    this.replFactory = new InterpreterFactory(interpreterSettingManager);
+    this.notebookRepo = new NotebookRepoSync(conf);
+    this.noteSearchService = new LuceneSearch();
+    this.notebookAuthorization = NotebookAuthorization.init(conf);
+    this.credentials = new Credentials(conf.credentialsPersist(), 
conf.getCredentialsPath());
+    notebook = new Notebook(conf,
+        notebookRepo, schedulerFactory, replFactory, 
interpreterSettingManager, notebookWsServer,
+            noteSearchService, notebookAuthorization, credentials);
+
     ZeppelinServer.helium = new Helium(
         conf.getHeliumConfPath(),
         conf.getHeliumRegistry(),
         new File(conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO),
             "helium-registry-cache"),
         heliumBundleFactory,
-        heliumApplicationFactory);
+        heliumApplicationFactory,
+        interpreterSettingManager);
 
     // create bundle
     try {
@@ -144,20 +153,6 @@ public class ZeppelinServer extends Application {
       LOG.error(e.getMessage(), e);
     }
 
-    this.schedulerFactory = new SchedulerFactory();
-    this.interpreterSettingManager = new InterpreterSettingManager(conf, 
depResolver,
-        new InterpreterOption(true));
-    this.replFactory = new InterpreterFactory(conf, notebookWsServer,
-        notebookWsServer, heliumApplicationFactory, depResolver, 
SecurityUtils.isAuthenticated(),
-        interpreterSettingManager);
-    this.notebookRepo = new NotebookRepoSync(conf);
-    this.noteSearchService = new LuceneSearch();
-    this.notebookAuthorization = NotebookAuthorization.init(conf);
-    this.credentials = new Credentials(conf.credentialsPersist(), 
conf.getCredentialsPath());
-    notebook = new Notebook(conf,
-        notebookRepo, schedulerFactory, replFactory, 
interpreterSettingManager, notebookWsServer,
-            noteSearchService, notebookAuthorization, credentials);
-
     // to update notebook from application event from remote process.
     heliumApplicationFactory.setNotebook(notebook);
     // to update fire websocket event on application event.
@@ -206,7 +201,7 @@ public class ZeppelinServer extends Application {
         LOG.info("Shutting down Zeppelin Server ... ");
         try {
           jettyWebServer.stop();
-          notebook.getInterpreterSettingManager().shutdown();
+          notebook.getInterpreterSettingManager().close();
           notebook.close();
           Thread.sleep(3000);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index f0e0bb2..2e3a5c7 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -41,12 +41,7 @@ import 
org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.helium.HeliumPackage;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -59,7 +54,6 @@ import org.apache.zeppelin.notebook.NotebookEventListener;
 import org.apache.zeppelin.notebook.NotebookImportDeserializer;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.ParagraphJobListener;
-import org.apache.zeppelin.notebook.ParagraphRuntimeInfo;
 import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
@@ -86,8 +80,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Queues;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 
 /**
@@ -463,7 +455,8 @@ public class NotebookServer extends WebSocketServlet
     Notebook notebook = notebook();
     List<Note> notes = notebook.getAllNotes();
     for (Note note : notes) {
-      List<String> ids = 
notebook.getInterpreterSettingManager().getInterpreters(note.getId());
+      List<String> ids = notebook.getInterpreterSettingManager()
+          .getInterpreterBinding(note.getId());
       for (String id : ids) {
         if (id.equals(interpreterGroupId)) {
           broadcast(note.getId(), m);
@@ -1022,7 +1015,7 @@ public class NotebookServer extends WebSocketServlet
         List<String> interpreterSettingIds = new LinkedList<>();
         interpreterSettingIds.add(defaultInterpreterId);
         for (String interpreterSettingId : 
notebook.getInterpreterSettingManager().
-            getDefaultInterpreterSettingList()) {
+            getInterpreterSettingIds()) {
           if (!interpreterSettingId.equals(defaultInterpreterId)) {
             interpreterSettingIds.add(interpreterSettingId);
           }
@@ -1382,12 +1375,13 @@ public class NotebookServer extends WebSocketServlet
       List<InterpreterSetting> settings =
           
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
       for (InterpreterSetting setting : settings) {
-        if (setting.getInterpreterGroup(user, note.getId()) == null) {
+        if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) {
           continue;
         }
-        if (interpreterGroupId.equals(setting.getInterpreterGroup(user, 
note.getId()).getId())) {
+        if 
(interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, 
note.getId())
+            .getId())) {
           AngularObjectRegistry angularObjectRegistry =
-              setting.getInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
+              setting.getOrCreateInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
 
           // first trying to get local registry
           ao = angularObjectRegistry.get(varName, noteId, paragraphId);
@@ -1424,12 +1418,13 @@ public class NotebookServer extends WebSocketServlet
         List<InterpreterSetting> settings =
             
notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId());
         for (InterpreterSetting setting : settings) {
-          if (setting.getInterpreterGroup(user, n.getId()) == null) {
+          if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) {
             continue;
           }
-          if (interpreterGroupId.equals(setting.getInterpreterGroup(user, 
n.getId()).getId())) {
+          if 
(interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId())
+              .getId())) {
             AngularObjectRegistry angularObjectRegistry =
-                setting.getInterpreterGroup(user, 
n.getId()).getAngularObjectRegistry();
+                setting.getOrCreateInterpreterGroup(user, 
n.getId()).getAngularObjectRegistry();
             this.broadcastExcept(n.getId(),
                 new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
                     .put("interpreterGroupId", 
interpreterGroupId).put("noteId", n.getId())
@@ -2302,13 +2297,13 @@ public class NotebookServer extends WebSocketServlet
 
     for (InterpreterSetting intpSetting : settings) {
       AngularObjectRegistry registry =
-          intpSetting.getInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
+          intpSetting.getOrCreateInterpreterGroup(user, 
note.getId()).getAngularObjectRegistry();
       List<AngularObject> objects = registry.getAllWithGlobal(note.getId());
       for (AngularObject object : objects) {
         conn.send(serializeMessage(
             new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object)
                 .put("interpreterGroupId",
-                    intpSetting.getInterpreterGroup(user, 
note.getId()).getId())
+                    intpSetting.getOrCreateInterpreterGroup(user, 
note.getId()).getId())
                 .put("noteId", note.getId()).put("paragraphId", 
object.getParagraphId())));
       }
     }
@@ -2354,7 +2349,7 @@ public class NotebookServer extends WebSocketServlet
       }
 
       List<String> settingIds =
-          
notebook.getInterpreterSettingManager().getInterpreters(note.getId());
+          
notebook.getInterpreterSettingManager().getInterpreterBinding(note.getId());
       for (String id : settingIds) {
         if (interpreterGroupId.contains(id)) {
           broadcast(note.getId(),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
deleted file mode 100644
index 1b1306a..0000000
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/interpreter/mock/MockInterpreter1.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.mock;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.scheduler.Scheduler;
-import org.apache.zeppelin.scheduler.SchedulerFactory;
-
-public class MockInterpreter1 extends Interpreter{
-  Map<String, Object> vars = new HashMap<>();
-
-  public MockInterpreter1(Properties property) {
-    super(property);
-  }
-
-  @Override
-  public void open() {
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public InterpreterResult interpret(String st, InterpreterContext context) {
-    return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: "+st);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) {
-  }
-
-  @Override
-  public FormType getFormType() {
-    return FormType.SIMPLE;
-  }
-
-  @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
-  }
-
-  @Override
-  public Scheduler getScheduler() {
-    return 
SchedulerFactory.singleton().createOrGetFIFOScheduler("test_"+this.hashCode());
-  }
-
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-      InterpreterContext interpreterContext) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index a7907db..e2f171f 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -307,10 +307,9 @@ public abstract class AbstractTestRestApi {
   protected static void shutDown() throws Exception {
     if (!wasRunning) {
       // restart interpreter to stop all interpreter processes
-      List<String> settingList = 
ZeppelinServer.notebook.getInterpreterSettingManager()
-          .getDefaultInterpreterSettingList();
-      for (String setting : settingList) {
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting);
+      List<InterpreterSetting> settingList = 
ZeppelinServer.notebook.getInterpreterSettingManager().get();
+      for (InterpreterSetting setting : settingList) {
+        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
       }
       if (shiroIni != null) {
         FileUtils.deleteQuietly(shiroIni);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
index 28541bd..72dd8a7 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/InterpreterRestApiTest.java
@@ -80,7 +80,7 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
 
     // then
     assertThat(get, isAllowed());
-    
assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getAvailableInterpreterSettings().size(),
+    
assertEquals(ZeppelinServer.notebook.getInterpreterSettingManager().getInterpreterSettingTemplates().size(),
         body.entrySet().size());
     get.releaseConnection();
   }
@@ -110,7 +110,7 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
   @Test
   public void testSettingsCRUD() throws IOException {
     // when: call create setting API
-    String rawRequest = "{\"name\":\"md2\",\"group\":\"md\"," +
+    String rawRequest = "{\"name\":\"md3\",\"group\":\"md\"," +
         "\"properties\":{\"propname\": {\"value\": \"propvalue\", \"name\": 
\"propname\", \"type\": \"textarea\"}}," +
         
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}],"
 +
         "\"dependencies\":[]," +
@@ -367,7 +367,7 @@ public class InterpreterRestApiTest extends 
AbstractTestRestApi {
 
   @Test
   public void testGetMetadataInfo() throws IOException {
-    String jsonRequest = "{\"name\":\"spark\",\"group\":\"spark\"," +
+    String jsonRequest = "{\"name\":\"spark_new\",\"group\":\"spark\"," +
             "\"properties\":{\"propname\": {\"value\": \"propvalue\", 
\"name\": \"propname\", \"type\": \"textarea\"}}," +
             
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}],"
 +
             "\"dependencies\":[]," +

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 8da36a6..10d77b2 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.AfterClass;
@@ -95,7 +96,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
   }
 
   @Test
-  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() 
throws IOException {
+  public void testMakeSureNoAngularObjectBroadcastToWebsocketWhoFireTheEvent() 
throws IOException, InterruptedException {
     // create a notebook
     Note note1 = notebook.createNote(anonymous);
 
@@ -104,7 +105,7 @@ public class NotebookServerTest extends AbstractTestRestApi 
{
     List<InterpreterSetting> settings = 
notebook.getInterpreterSettingManager().getInterpreterSettings(note1.getId());
     for (InterpreterSetting setting : settings) {
       if (setting.getName().equals("md")) {
-        interpreterGroup = setting.getInterpreterGroup("anonymous", 
"sharedProcess");
+        interpreterGroup = setting.getOrCreateInterpreterGroup("anonymous", 
"sharedProcess");
         break;
       }
     }
@@ -115,6 +116,14 @@ public class NotebookServerTest extends 
AbstractTestRestApi {
     p1.setAuthenticationInfo(anonymous);
     note1.run(p1.getId());
 
+    // wait for paragraph finished
+    while(true) {
+      if (p1.getStatus() == Job.Status.FINISHED) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+
     // add angularObject
     interpreterGroup.getAngularObjectRegistry().add("object1", "value1", 
note1.getId(), null);
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties 
b/zeppelin-server/src/test/resources/log4j.properties
index b0d1067..8368993 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -33,7 +33,6 @@ log4j.logger.org.apache.hadoop.mapred=WARN
 log4j.logger.org.apache.hadoop.hive.ql=WARN
 log4j.logger.org.apache.hadoop.hive.metastore=WARN
 log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
-log4j.logger.org.apache.zeppelin.scheduler=WARN
 
 log4j.logger.org.quartz=WARN
 log4j.logger.DataNucleus=WARN

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index f00fe93..03cc069 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -17,22 +17,21 @@
 
 package org.apache.zeppelin.conf;
 
-import java.io.File;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.configuration.tree.ConfigurationNode;
 import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.notebook.repo.GitNotebookRepo;
 import org.apache.zeppelin.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Zeppelin configuration.
  *
@@ -528,7 +527,7 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
                                                 ConfigurationKeyPredicate 
predicate) {
     Map<String, String> configurations = new HashMap<>();
 
-    for (ZeppelinConfiguration.ConfVars v : 
ZeppelinConfiguration.ConfVars.values()) {
+    for (ConfVars v : ConfVars.values()) {
       String key = v.getVarName();
 
       if (!predicate.apply(key)) {
@@ -653,7 +652,8 @@ public class ZeppelinConfiguration extends XMLConfiguration 
{
     ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", 
"notes"),
     ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", 
"mongodb://localhost"),
     ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", 
false),
-    ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", 
GitNotebookRepo.class.getName()),
+    ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage",
+        "org.apache.zeppelin.notebook.repo.GitNotebookRepo"),
     ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
     // whether by default note is public or private
     ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true),

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
index 17a3529..e5f2e3b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/Helium.java
@@ -16,14 +16,17 @@
  */
 package org.apache.zeppelin.helium;
 
+import com.google.gson.Gson;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.notebook.Paragraph;
-import org.apache.zeppelin.resource.DistributedResourcePool;
-import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.resource.ResourcePoolUtils;
-import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.resource.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,19 +50,22 @@ public class Helium {
 
   private final HeliumBundleFactory bundleFactory;
   private final HeliumApplicationFactory applicationFactory;
+  private final InterpreterSettingManager interpreterSettingManager;
 
   public Helium(
       String heliumConfPath,
       String registryPaths,
       File registryCacheDir,
       HeliumBundleFactory bundleFactory,
-      HeliumApplicationFactory applicationFactory)
+      HeliumApplicationFactory applicationFactory,
+      InterpreterSettingManager interpreterSettingManager)
       throws IOException {
     this.heliumConfPath = heliumConfPath;
     this.registryPaths = registryPaths;
     this.registryCacheDir = registryCacheDir;
     this.bundleFactory = bundleFactory;
     this.applicationFactory = applicationFactory;
+    this.interpreterSettingManager = interpreterSettingManager;
     heliumConf = loadConf(heliumConfPath);
     allPackages = getAllPackageInfo();
   }
@@ -350,7 +356,7 @@ public class Helium {
         allResources = resourcePool.getAll();
       }
     } else {
-      allResources = ResourcePoolUtils.getAllResources();
+      allResources = interpreterSettingManager.getAllResources();
     }
 
     for (List<HeliumPackageSearchResult> pkgs : allPackages.values()) {
@@ -478,4 +484,40 @@ public class Helium {
 
     return mixed;
   }
+
+  public ResourceSet getAllResources() {
+    return getAllResourcesExcept(null);
+  }
+
+  private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) 
{
+    ResourceSet resourceSet = new ResourceSet();
+    for (ManagedInterpreterGroup intpGroup : 
interpreterSettingManager.getAllInterpreterGroup()) {
+      if (interpreterGroupExcludsion != null &&
+          intpGroup.getId().equals(interpreterGroupExcludsion)) {
+        continue;
+      }
+
+      RemoteInterpreterProcess remoteInterpreterProcess = 
intpGroup.getRemoteInterpreterProcess();
+      if (remoteInterpreterProcess == null) {
+        ResourcePool localPool = intpGroup.getResourcePool();
+        if (localPool != null) {
+          resourceSet.addAll(localPool.getAll());
+        }
+      } else if (remoteInterpreterProcess.isRunning()) {
+        List<String> resourceList = 
remoteInterpreterProcess.callRemoteFunction(
+            new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
+              @Override
+              public List<String> call(RemoteInterpreterService.Client client) 
throws Exception {
+                return client.resourcePoolGetAll();
+              }
+            }
+        );
+        Gson gson = new Gson();
+        for (String res : resourceList) {
+          resourceSet.add(gson.fromJson(res, Resource.class));
+        }
+      }
+    }
+    return resourceSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
index 84368a7..f4dfae3 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.zeppelin.helium;
 
-import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -80,7 +79,7 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
       try {
         // get interpreter process
         Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
-        InterpreterGroup intpGroup = intp.getInterpreterGroup();
+        ManagedInterpreterGroup intpGroup = (ManagedInterpreterGroup) 
intp.getInterpreterGroup();
         RemoteInterpreterProcess intpProcess = 
intpGroup.getRemoteInterpreterProcess();
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not 
running");
@@ -105,38 +104,33 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
     private void load(RemoteInterpreterProcess intpProcess, ApplicationState 
appState)
         throws Exception {
 
-      RemoteInterpreterService.Client client = null;
-
       synchronized (appState) {
         if (appState.getStatus() == ApplicationState.Status.LOADED) {
           // already loaded
           return;
         }
 
-        try {
-          appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADING);
-          String pkgInfo = pkg.toJson();
-          String appId = appState.getId();
-
-          client = intpProcess.getClient();
-          RemoteApplicationResult ret = client.loadApplication(
-              appId,
-              pkgInfo,
-              paragraph.getNote().getId(),
-              paragraph.getId());
-
-          if (ret.isSuccess()) {
-            appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADED);
-          } else {
-            throw new ApplicationException(ret.getMsg());
-          }
-        } catch (TException e) {
-          intpProcess.releaseBrokenClient(client);
-          throw e;
-        } finally {
-          if (client != null) {
-            intpProcess.releaseClient(client);
-          }
+        appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADING);
+        final String pkgInfo = pkg.toJson();
+        final String appId = appState.getId();
+
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+            new 
RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+              @Override
+              public RemoteApplicationResult 
call(RemoteInterpreterService.Client client)
+                  throws Exception {
+                return client.loadApplication(
+                    appId,
+                    pkgInfo,
+                    paragraph.getNote().getId(),
+                    paragraph.getId());
+              }
+            }
+        );
+        if (ret.isSuccess()) {
+          appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADED);
+        } else {
+          throw new ApplicationException(ret.getMsg());
         }
       }
     }
@@ -199,7 +193,7 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
       }
     }
 
-    private void unload(ApplicationState appsToUnload) throws 
ApplicationException {
+    private void unload(final ApplicationState appsToUnload) throws 
ApplicationException {
       synchronized (appsToUnload) {
         if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
           throw new ApplicationException(
@@ -212,31 +206,24 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
         }
 
         RemoteInterpreterProcess intpProcess =
-            intp.getInterpreterGroup().getRemoteInterpreterProcess();
+            ((ManagedInterpreterGroup) 
intp.getInterpreterGroup()).getRemoteInterpreterProcess();
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not 
running");
         }
 
-        RemoteInterpreterService.Client client;
-        try {
-          client = intpProcess.getClient();
-        } catch (Exception e) {
-          throw new ApplicationException(e);
-        }
-
-        try {
-          RemoteApplicationResult ret = 
client.unloadApplication(appsToUnload.getId());
-
-          if (ret.isSuccess()) {
-            appStatusChange(paragraph, appsToUnload.getId(), 
ApplicationState.Status.UNLOADED);
-          } else {
-            throw new ApplicationException(ret.getMsg());
-          }
-        } catch (TException e) {
-          intpProcess.releaseBrokenClient(client);
-          throw new ApplicationException(e);
-        } finally {
-          intpProcess.releaseClient(client);
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+            new 
RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+              @Override
+              public RemoteApplicationResult 
call(RemoteInterpreterService.Client client)
+                  throws Exception {
+                return client.unloadApplication(appsToUnload.getId());
+              }
+            }
+        );
+        if (ret.isSuccess()) {
+          appStatusChange(paragraph, appsToUnload.getId(), 
ApplicationState.Status.UNLOADED);
+        } else {
+          throw new ApplicationException(ret.getMsg());
         }
       }
     }
@@ -286,7 +273,7 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
       }
     }
 
-    private void run(ApplicationState app) throws ApplicationException {
+    private void run(final ApplicationState app) throws ApplicationException {
       synchronized (app) {
         if (app.getStatus() != ApplicationState.Status.LOADED) {
           throw new ApplicationException(
@@ -299,33 +286,23 @@ public class HeliumApplicationFactory implements 
ApplicationEventListener, Noteb
         }
 
         RemoteInterpreterProcess intpProcess =
-            intp.getInterpreterGroup().getRemoteInterpreterProcess();
+            ((ManagedInterpreterGroup) 
intp.getInterpreterGroup()).getRemoteInterpreterProcess();
         if (intpProcess == null) {
           throw new ApplicationException("Target interpreter process is not 
running");
         }
-        RemoteInterpreterService.Client client = null;
-        try {
-          client = intpProcess.getClient();
-        } catch (Exception e) {
-          throw new ApplicationException(e);
-        }
-
-        try {
-          RemoteApplicationResult ret = client.runApplication(app.getId());
-
-          if (ret.isSuccess()) {
-            // success
-          } else {
-            throw new ApplicationException(ret.getMsg());
-          }
-        } catch (TException e) {
-          intpProcess.releaseBrokenClient(client);
-          client = null;
-          throw new ApplicationException(e);
-        } finally {
-          if (client != null) {
-            intpProcess.releaseClient(client);
-          }
+        RemoteApplicationResult ret = intpProcess.callRemoteFunction(
+            new 
RemoteInterpreterProcess.RemoteFunction<RemoteApplicationResult>() {
+              @Override
+              public RemoteApplicationResult 
call(RemoteInterpreterService.Client client)
+                  throws Exception {
+                return client.runApplication(app.getId());
+              }
+            }
+        );
+        if (ret.isSuccess()) {
+          // success
+        } else {
+          throw new ApplicationException(ret.getMsg());
         }
       }
     }

Reply via email to