ZEPPELIN-3051. Support Interpreter Process Recovery

### What is this PR for?
This PR is for the purpose of recover running interpreter process when zeppelin 
server is restarted. This would be useful when restarting zeppelin without 
interrupt current running interpreter processes, should be useful when admin do 
maintenance or upgrading.

Interface `RecoveryStorage` is used for storing the information of running 
interpreter process.
Currently it only has one implementation `FileSystemRecoveryStorage`, other 
implementation could be done later (such as zookeeper based). 
`InterpreterLauncher` is the component where to recover the running interpreter 
process.

Test:
* RecoveryTest.java
* FileSystemRecoveryStorageTest.java

Design Doc:

https://docs.google.com/document/d/1Plm3Hd40aGdNaXmjdsoY4ek3f-gTijTMGMkNjAZN39Y/edit?usp=sharing

### What type of PR is it?
[Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3051

### How should this be tested?
Unit test & Integration Test is added. Also manually verified.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2668 from zjffdu/ZEPPELIN-3051 and squashes the following commits:

a4c9b9c [Jeff Zhang] address comments
575b7b9 [Jeff Zhang] fix the pid of interpreter process id
02b118f [Jeff Zhang] address comments
da7cbb9 [Jeff Zhang] ZEPPELIN-3051. Support Interpreter Process Recovery


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/4c8f20ae
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/4c8f20ae
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/4c8f20ae

Branch: refs/heads/master
Commit: 4c8f20ae33ceb47209402c0469791d7a19571471
Parents: 13f8e6c
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Dec 5 16:27:12 2017 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Thu Dec 14 16:20:26 2017 +0800

----------------------------------------------------------------------
 bin/interpreter.sh                              |   2 +-
 bin/stop-interpreter.sh                         |  47 ++++++
 bin/zeppelin-daemon.sh                          |  12 --
 conf/zeppelin-site.xml.template                 |  41 +++++
 docs/usage/interpreter/overview.md              |   8 +
 .../src/main/resources/interpreter-setting.json |   2 +-
 .../zeppelin/conf/ZeppelinConfiguration.java    |  17 ++
 .../interpreter/launcher/InterpreterClient.java |  14 +-
 .../launcher/InterpreterLaunchContext.java      |   7 +
 .../launcher/InterpreterLauncher.java           |   5 +-
 .../interpreter/recovery/RecoveryStorage.java   |  80 +++++++++
 zeppelin-server/notebook/.python.recovery.crc   | Bin 0 -> 12 bytes
 zeppelin-server/notebook/python.recovery        |   1 +
 zeppelin-server/pom.xml                         |  15 ++
 .../apache/zeppelin/server/ZeppelinServer.java  |  10 +-
 .../apache/zeppelin/recovery/RecoveryTest.java  | 162 ++++++++++++++++++
 .../zeppelin/rest/AbstractTestRestApi.java      |  13 +-
 .../interpreter/InterpreterSetting.java         |  45 ++++-
 .../interpreter/InterpreterSettingManager.java  |  38 ++++-
 .../interpreter/ManagedInterpreterGroup.java    |  25 ++-
 .../launcher/ShellScriptLauncher.java           |  30 +++-
 .../launcher/SparkInterpreterLauncher.java      |   5 +-
 .../recovery/FileSystemRecoveryStorage.java     | 139 +++++++++++++++
 .../recovery/NullRecoveryStorage.java           |  54 ++++++
 .../interpreter/recovery/StopInterpreter.java   |  40 +++++
 .../interpreter/remote/RemoteInterpreter.java   |  11 +-
 .../remote/RemoteInterpreterManagedProcess.java |   3 +-
 .../remote/RemoteInterpreterProcess.java        |   6 -
 .../remote/RemoteInterpreterRunningProcess.java |  28 +++-
 .../zeppelin/notebook/FileSystemStorage.java    | 168 +++++++++++++++++++
 .../notebook/repo/FileSystemNotebookRepo.java   | 124 +++-----------
 .../apache/zeppelin/util/ReflectionUtils.java   |  99 +++++++++++
 .../interpreter/AbstractInterpreterTest.java    |   2 +-
 .../launcher/ShellScriptLauncherTest.java       |   7 +-
 .../launcher/SparkInterpreterLauncherTest.java  |  31 ++--
 .../recovery/FileSystemRecoveryStorageTest.java |  92 ++++++++++
 36 files changed, 1195 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 458ffc0..f23ca82 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -220,8 +220,8 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n 
"${suid}" || -z "${SPARK_SUB
 fi
 
 eval $INTERPRETER_RUN_COMMAND &
-
 pid=$!
+
 if [[ -z "${pid}" ]]; then
   exit 1;
 else

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/bin/stop-interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/stop-interpreter.sh b/bin/stop-interpreter.sh
new file mode 100755
index 0000000..e6ff16e
--- /dev/null
+++ b/bin/stop-interpreter.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Stop Zeppelin Interpreter Processes
+#
+
+bin=$(dirname "${BASH_SOURCE-$0}")
+bin=$(cd "${bin}">/dev/null; pwd)
+
+. "${bin}/common.sh"
+
+export ZEPPELIN_FORCE_STOP=1
+
+ZEPPELIN_STOP_INTERPRETER_MAIN=org.apache.zeppelin.interpreter.recovery.StopInterpreter
+ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/stop-interpreter.log"
+JAVA_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
+
+if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" ]]; then
+  ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/classes"
+fi
+
+if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then
+  ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes"
+fi
+
+addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
+addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
+addJarInDir "${ZEPPELIN_HOME}/lib"
+addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"
+
+CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
+$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_STOP_INTERPRETER_MAIN ${@}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/bin/zeppelin-daemon.sh
----------------------------------------------------------------------
diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh
index 5982aee..e898849 100755
--- a/bin/zeppelin-daemon.sh
+++ b/bin/zeppelin-daemon.sh
@@ -217,18 +217,6 @@ function stop() {
       action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
     fi
   fi
-
-  # list all pid that used in remote interpreter and kill them
-  for f in ${ZEPPELIN_PID_DIR}/*.pid; do
-    if [[ ! -f ${f} ]]; then
-      continue;
-    fi
-
-    pid=$(cat ${f})
-    wait_for_zeppelin_to_die $pid 20
-    $(rm -f ${f})
-  done
-
 }
 
 function find_zeppelin_process() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 3c5bbea..d566a71 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -480,4 +480,45 @@
   <value>10000:10010</value>
 </property>
 -->
+
+<!--
+<property>
+  <name>zeppelin.interpreter.lifecyclemanager.class</name>
+  
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
+</property>
+-->
+
+<!--
+<property>
+  <name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
+  <value>6000</value>
+  <description>Check interval of interpreter expiration in 
seconds</description>
+</property>
+-->
+
+<!--
+<property>
+  <name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
+  <value>3600000</value>
+  <description>Threshold of interpreter idle time in seconds, interpeter 
exceed this threshold will be killed</description>  
+</property>
+-->
+
+<!--
+<property>
+  <name>zeppelin.recovery.storage.class</name>
+  
<value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
+  <description>ReoveryStorage implementation</description>
+</property>
+-->
+
+<!--
+<property>
+  <name>zeppelin.recovery.dir</name>
+  <value>recovery</value>
+  <description>Location where recovery metadata is stored</description>
+</property>
+-->
+
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/docs/usage/interpreter/overview.md
----------------------------------------------------------------------
diff --git a/docs/usage/interpreter/overview.md 
b/docs/usage/interpreter/overview.md
index dd5ed22..035c381 100644
--- a/docs/usage/interpreter/overview.md
+++ b/docs/usage/interpreter/overview.md
@@ -144,3 +144,11 @@ So users needs to understand the ([interpreter mode 
setting ](../usage/interpret
 In this scenario, user need to put `ConfInterpreter` as the first paragraph as 
the below example. Otherwise the customized setting can not be applied 
(Actually it would report ERROR)
 <img 
src="{{BASE_PATH}}/assets/themes/zeppelin/img/screenshots/conf_interpreter.png" 
width="500px">
 
+
+## Interpreter Process Recovery
+
+Before 0.8.0, shutting down Zeppelin also mean to shutdown all the running 
interpreter processes. Usually admin will shutdown Zeppelin server for 
maintenance or upgrade, but don't want to shut down the running interpreter 
processes.
+In such cases, interpreter process recovery is necessary. Starting from 0.8.0, 
user can enable interpreter process recovering via setting 
`zeppelin.recovery.storage.class` as 
+`org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage` or other 
implementations if available in future, by default it is 
`org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage`
+ which means recovery is not enabled. Enable recover means shutting down 
Zeppelin would not terminating interpreter process,
+and when Zeppelin is restarted, it would try to reconnect to the existing 
running interpreter processes. If you want to kill all the interpreter 
processes after terminating Zeppelin even when recovery is enabled, you can run 
`bin/stop-interpreter.sh` 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/spark/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/interpreter-setting.json 
b/spark/src/main/resources/interpreter-setting.json
index 485f695..d656532 100644
--- a/spark/src/main/resources/interpreter-setting.json
+++ b/spark/src/main/resources/interpreter-setting.json
@@ -61,7 +61,7 @@
         "description": "Spark master uri. ex) spark://masterhost:7077",
         "type": "string"
       },
-      "zeppelin.spark.unSupportedVersionCheck": {
+      "zeppelin.spark.enableSupportedVersionCheck": {
         "envName": null,
         "propertyName": "zeppelin.spark.enableSupportedVersionCheck",
         "defaultValue": true,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 438c661..77279ed 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -355,6 +355,19 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
     return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
   }
 
+  public String getRecoveryDir() {
+    return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
+  }
+
+  public String getRecoveryStorageClass() {
+    return getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS);
+  }
+
+  public boolean isRecoveryEnabled() {
+    return !getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS).equals(
+        "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage");
+  }
+
   public String getUser() {
     return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
   }
@@ -658,6 +671,10 @@ public class ZeppelinConfiguration extends 
XMLConfiguration {
     ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 
1024 * 100),
     ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
     ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
+    ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
+    ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
+        "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),
+
     // use specified notebook (id) as homescreen
     ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
     // whether homescreen notebook will be hidden from notebook list or not

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
index b991079..813dad8 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java
@@ -19,8 +19,20 @@ package org.apache.zeppelin.interpreter.launcher;
 
 /**
  * Interface to InterpreterClient which is created by InterpreterLauncher. 
This is the component
- * that is used to for the communication fromzeppelin-server process to 
zeppelin interpreter process
+ * that is used to for the communication from zeppelin-server process to 
zeppelin interpreter
+ * process.
  */
 public interface InterpreterClient {
 
+  String getInterpreterSettingName();
+
+  void start(String userName, Boolean isUserImpersonate);
+
+  void stop();
+
+  String getHost();
+
+  int getPort();
+
+  boolean isRunning();
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
index 9e25355..6901e2c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
@@ -30,6 +30,7 @@ public class InterpreterLaunchContext {
   private Properties properties;
   private InterpreterOption option;
   private InterpreterRunner runner;
+  private String interpreterGroupId;
   private String interpreterSettingId;
   private String interpreterSettingGroup;
   private String interpreterSettingName;
@@ -37,12 +38,14 @@ public class InterpreterLaunchContext {
   public InterpreterLaunchContext(Properties properties,
                                   InterpreterOption option,
                                   InterpreterRunner runner,
+                                  String interpreterGroupId,
                                   String interpreterSettingId,
                                   String interpreterSettingGroup,
                                   String interpreterSettingName) {
     this.properties = properties;
     this.option = option;
     this.runner = runner;
+    this.interpreterGroupId = interpreterGroupId;
     this.interpreterSettingId = interpreterSettingId;
     this.interpreterSettingGroup = interpreterSettingGroup;
     this.interpreterSettingName = interpreterSettingName;
@@ -60,6 +63,10 @@ public class InterpreterLaunchContext {
     return runner;
   }
 
+  public String getInterpreterGroupId() {
+    return interpreterGroupId;
+  }
+
   public String getInterpreterSettingId() {
     return interpreterSettingId;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index 5d0acf3..1cee20e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.interpreter.launcher;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -29,9 +30,11 @@ public abstract class InterpreterLauncher {
 
   protected ZeppelinConfiguration zConf;
   protected Properties properties;
+  protected RecoveryStorage recoveryStorage;
 
-  public InterpreterLauncher(ZeppelinConfiguration zConf) {
+  public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
     this.zConf = zConf;
+    this.recoveryStorage = recoveryStorage;
   }
 
   public abstract  InterpreterClient launch(InterpreterLaunchContext context) 
throws IOException;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
new file mode 100644
index 0000000..8bbe830
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.recovery;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+/**
+ * Interface for storing interpreter process recovery metadata.
+ *
+ */
+public abstract class RecoveryStorage {
+
+  protected ZeppelinConfiguration zConf;
+  protected Map<String, InterpreterClient> restoredClients;
+
+  public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
+    this.zConf = zConf;
+  }
+
+  /**
+   * Update RecoveryStorage when new InterpreterClient is started
+   * @param client
+   * @throws IOException
+   */
+  public abstract void onInterpreterClientStart(InterpreterClient client) 
throws IOException;
+
+  /**
+   * Update RecoveryStorage when InterpreterClient is stopped
+   * @param client
+   * @throws IOException
+   */
+  public abstract void onInterpreterClientStop(InterpreterClient client) 
throws IOException;
+
+  /**
+   *
+   * It is only called when Zeppelin Server is started.
+   *
+   * @return
+   * @throws IOException
+   */
+  public abstract Map<String, InterpreterClient> restore() throws IOException;
+
+
+  /**
+   * It is called after constructor
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    this.restoredClients = restore();
+  }
+
+  public InterpreterClient getInterpreterClient(String interpreterGroupId) {
+    if (restoredClients.containsKey(interpreterGroupId)) {
+      return restoredClients.get(interpreterGroupId);
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-server/notebook/.python.recovery.crc
----------------------------------------------------------------------
diff --git a/zeppelin-server/notebook/.python.recovery.crc 
b/zeppelin-server/notebook/.python.recovery.crc
new file mode 100644
index 0000000..6bd3e7a
Binary files /dev/null and b/zeppelin-server/notebook/.python.recovery.crc 
differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-server/notebook/python.recovery
----------------------------------------------------------------------
diff --git a/zeppelin-server/notebook/python.recovery 
b/zeppelin-server/notebook/python.recovery
new file mode 100644
index 0000000..eaf4938
--- /dev/null
+++ b/zeppelin-server/notebook/python.recovery
@@ -0,0 +1 @@
+2CZA1DVUG:shared_process       192.168.3.2:55410
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 08ede29..925c637 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -350,6 +350,21 @@
       </plugin>
 
       <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${plugin.surefire.version}</version>
+        <configuration combine.children="append">
+          <argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
+          <excludes>
+            <exclude>${tests.to.exclude}</exclude>
+          </excludes>
+          <environmentVariables>
+            <ZEPPELIN_FORCE_STOP>1</ZEPPELIN_FORCE_STOP>
+          </environmentVariables>
+        </configuration>
+      </plugin>
+
+
+      <plugin>
         <groupId>org.scala-tools</groupId>
         <artifactId>maven-scala-plugin</artifactId>
         <version>${plugin.scala.version}</version>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 0b66a43..f8625c2 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -162,7 +162,7 @@ public class ZeppelinServer extends Application {
 
   public static void main(String[] args) throws InterruptedException {
 
-    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    final ZeppelinConfiguration conf = ZeppelinConfiguration.create();
     conf.setProperty("args", args);
 
     jettyWebServer = setupJettyServer(conf);
@@ -199,7 +199,9 @@ public class ZeppelinServer extends Application {
         LOG.info("Shutting down Zeppelin Server ... ");
         try {
           jettyWebServer.stop();
-          notebook.getInterpreterSettingManager().close();
+          if (!conf.isRecoveryEnabled()) {
+            ZeppelinServer.notebook.getInterpreterSettingManager().close();
+          }
           notebook.close();
           Thread.sleep(3000);
         } catch (Exception e) {
@@ -222,7 +224,9 @@ public class ZeppelinServer extends Application {
     }
 
     jettyWebServer.join();
-    ZeppelinServer.notebook.getInterpreterSettingManager().close();
+    if (!conf.isRecoveryEnabled()) {
+      ZeppelinServer.notebook.getInterpreterSettingManager().close();
+    }
   }
 
   private static Server setupJettyServer(ZeppelinConfiguration conf) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java 
b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
new file mode 100644
index 0000000..37277ee
--- /dev/null
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.recovery;
+
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
+import org.apache.zeppelin.interpreter.recovery.StopInterpreter;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.rest.AbstractTestRestApi;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class RecoveryTest extends AbstractTestRestApi {
+
+  private Gson gson = new Gson();
+  private static File recoveryDir = null;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
+        FileSystemRecoveryStorage.class.getName());
+    recoveryDir = Files.createTempDir();
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(),
 recoveryDir.getAbsolutePath());
+    startUp(RecoveryTest.class.getSimpleName());
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    shutDown();
+    FileUtils.deleteDirectory(recoveryDir);
+  }
+
+  @Test
+  public void testRecovery() throws Exception {
+    Note note1 = 
ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
+
+    // run python interpreter and create new variable `user`
+    Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    p1.setText("%python user='abc'");
+    PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+    assertThat(post, isAllowed());
+    Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), 
new TypeToken<Map<String, Object>>() {
+    }.getType());
+    assertEquals(resp.get("status"), "OK");
+    post.releaseConnection();
+    assertEquals(Job.Status.FINISHED, p1.getStatus());
+
+    // shutdown zeppelin and restart it
+    shutDown();
+    startUp(RecoveryTest.class.getSimpleName());
+
+    // run the paragraph again, but change the text to print variable `user`
+    note1 = ZeppelinServer.notebook.getNote(note1.getId());
+    p1 = note1.getParagraph(p1.getId());
+    p1.setText("%python print(user)");
+    post = httpPost("/notebook/job/" + note1.getId(), "");
+    assertEquals(resp.get("status"), "OK");
+    post.releaseConnection();
+    assertEquals(Job.Status.FINISHED, p1.getStatus());
+    assertEquals("abc\n", p1.getResult().message().get(0).getData());
+  }
+
+  @Test
+  public void testRecovery_2() throws Exception {
+    Note note1 = 
ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
+
+    // run python interpreter and create new variable `user`
+    Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    p1.setText("%python user='abc'");
+    PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+    assertThat(post, isAllowed());
+    Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), 
new TypeToken<Map<String, Object>>() {
+    }.getType());
+    assertEquals(resp.get("status"), "OK");
+    post.releaseConnection();
+    assertEquals(Job.Status.FINISHED, p1.getStatus());
+
+    // restart the python interpreter
+    ZeppelinServer.notebook.getInterpreterSettingManager().restart(
+        ((ManagedInterpreterGroup) 
p1.getBindedInterpreter().getInterpreterGroup())
+            .getInterpreterSetting().getId()
+    );
+
+    // shutdown zeppelin and restart it
+    shutDown();
+    startUp(RecoveryTest.class.getSimpleName());
+
+    // run the paragraph again, but change the text to print variable `user`.
+    // can not recover the python interpreter, because it has been shutdown.
+    note1 = ZeppelinServer.notebook.getNote(note1.getId());
+    p1 = note1.getParagraph(p1.getId());
+    p1.setText("%python print(user)");
+    post = httpPost("/notebook/job/" + note1.getId(), "");
+    assertEquals(resp.get("status"), "OK");
+    post.releaseConnection();
+    assertEquals(Job.Status.ERROR, p1.getStatus());
+  }
+
+  @Test
+  public void testRecovery_3() throws Exception {
+    Note note1 = 
ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
+
+    // run python interpreter and create new variable `user`
+    Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    p1.setText("%python user='abc'");
+    PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
+    assertThat(post, isAllowed());
+    Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), 
new TypeToken<Map<String, Object>>() {
+    }.getType());
+    assertEquals(resp.get("status"), "OK");
+    post.releaseConnection();
+    assertEquals(Job.Status.FINISHED, p1.getStatus());
+
+    // shutdown zeppelin and restart it
+    shutDown();
+    StopInterpreter.main(new String[]{});
+
+    startUp(RecoveryTest.class.getSimpleName());
+
+    // run the paragraph again, but change the text to print variable `user`.
+    // can not recover the python interpreter, because it has been shutdown.
+    note1 = ZeppelinServer.notebook.getNote(note1.getId());
+    p1 = note1.getParagraph(p1.getId());
+    p1.setText("%python print(user)");
+    post = httpPost("/notebook/job/" + note1.getId(), "");
+    assertEquals(resp.get("status"), "OK");
+    post.releaseConnection();
+    assertEquals(Job.Status.ERROR, p1.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 431e364..7c08365 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -318,8 +318,10 @@ public abstract class AbstractTestRestApi {
     if (!wasRunning) {
       // restart interpreter to stop all interpreter processes
       List<InterpreterSetting> settingList = 
ZeppelinServer.notebook.getInterpreterSettingManager().get();
-      for (InterpreterSetting setting : settingList) {
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
+      if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
+        for (InterpreterSetting setting : settingList) {
+          
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
+        }
       }
       if (shiroIni != null) {
         FileUtils.deleteQuietly(shiroIni);
@@ -350,7 +352,12 @@ public abstract class AbstractTestRestApi {
             
.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName());
       }
 
-      FileUtils.deleteDirectory(confDir);
+      if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
+        // don't delete interpreter.json when recovery is enabled. otherwise 
the interpreter setting
+        // id will change after zeppelin restart, then we can not recover 
interpreter process
+        // properly
+        FileUtils.deleteDirectory(confDir);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index d5ff947..424aa27 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -38,6 +38,8 @@ import 
org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
 import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
 import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
 import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
+import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
@@ -144,6 +146,9 @@ public class InterpreterSetting {
 
 
 
+  private transient RecoveryStorage recoveryStorage;
+  
///////////////////////////////////////////////////////////////////////////////////////////
+
   /**
    * Builder class for InterpreterSetting
    */
@@ -242,6 +247,11 @@ public class InterpreterSetting {
       return this;
     }
 
+    public Builder setRecoveryStorage(RecoveryStorage recoveryStorage) {
+      interpreterSetting.recoveryStorage = recoveryStorage;
+      return this;
+    }
+
     public InterpreterSetting create() {
       // post processing
       interpreterSetting.postProcessing();
@@ -261,6 +271,13 @@ public class InterpreterSetting {
     if (this.lifecycleManager == null) {
       this.lifecycleManager = new NullLifecycleManager(conf);
     }
+    if (this.recoveryStorage == null) {
+      try {
+        this.recoveryStorage = new NullRecoveryStorage(conf, 
interpreterSettingManager);
+      } catch (IOException e) {
+        // ignore this exception as NullRecoveryStorage will do nothing.
+      }
+    }
   }
 
   /**
@@ -285,9 +302,9 @@ public class InterpreterSetting {
 
   private void createLauncher() {
     if (group.equals("spark")) {
-      this.launcher = new SparkInterpreterLauncher(this.conf);
+      this.launcher = new SparkInterpreterLauncher(this.conf, 
this.recoveryStorage);
     } else {
-      this.launcher = new ShellScriptLauncher(this.conf);
+      this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
     }
   }
 
@@ -344,6 +361,15 @@ public class InterpreterSetting {
     return this;
   }
 
+  public InterpreterSetting setRecoveryStorage(RecoveryStorage 
recoveryStorage) {
+    this.recoveryStorage = recoveryStorage;
+    return this;
+  }
+
+  public RecoveryStorage getRecoveryStorage() {
+    return recoveryStorage;
+  }
+
   public LifecycleManager getLifecycleManager() {
     return lifecycleManager;
   }
@@ -408,7 +434,12 @@ public class InterpreterSetting {
   }
 
   void removeInterpreterGroup(String groupId) {
-    this.interpreterGroups.remove(groupId);
+    try {
+      interpreterGroupWriteLock.lock();
+      this.interpreterGroups.remove(groupId);
+    } finally {
+      interpreterGroupWriteLock.unlock();
+    }
   }
 
   public ManagedInterpreterGroup getInterpreterGroup(String user, String 
noteId) {
@@ -425,7 +456,6 @@ public class InterpreterSetting {
     return interpreterGroups.get(groupId);
   }
 
-  @VisibleForTesting
   public ArrayList<ManagedInterpreterGroup> getAllInterpreterGroups() {
     try {
       interpreterGroupReadLock.lock();
@@ -668,16 +698,19 @@ public class InterpreterSetting {
     return interpreters;
   }
 
-  synchronized RemoteInterpreterProcess createInterpreterProcess(Properties 
properties)
+  synchronized RemoteInterpreterProcess createInterpreterProcess(String 
interpreterGroupId,
+                                                                 Properties 
properties)
       throws IOException {
     if (launcher == null) {
       createLauncher();
     }
     InterpreterLaunchContext launchContext = new
-        InterpreterLaunchContext(properties, option, interpreterRunner, id, 
group, name);
+        InterpreterLaunchContext(properties, option, interpreterRunner,
+        interpreterGroupId, id, group, name);
     RemoteInterpreterProcess process = (RemoteInterpreterProcess) 
launcher.launch(launchContext);
     process.setRemoteInterpreterEventPoller(
         new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, 
appEventListener));
+    recoveryStorage.onInterpreterClientStart(process);
     return process;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 0b7efd5..42f82fa 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -34,12 +34,16 @@ import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
+import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.sonatype.aether.repository.Authentication;
@@ -118,6 +122,7 @@ public class InterpreterSettingManager {
   private ApplicationEventListener appEventListener;
   private DependencyResolver dependencyResolver;
   private LifecycleManager lifecycleManager;
+  private RecoveryStorage recoveryStorage;
 
   public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
                                    AngularObjectRegistryListener 
angularObjectRegistryListener,
@@ -154,13 +159,17 @@ public class InterpreterSettingManager {
     this.angularObjectRegistryListener = angularObjectRegistryListener;
     this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
     this.appEventListener = appEventListener;
-    try {
-      this.lifecycleManager = (LifecycleManager)
-          
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
-              .newInstance(conf);
-    } catch (Exception e) {
-      throw new IOException("Fail to create LifecycleManager", e);
-    }
+
+    this.recoveryStorage = 
ReflectionUtils.createClazzInstance(conf.getRecoveryStorageClass(),
+        new Class[] {ZeppelinConfiguration.class, 
InterpreterSettingManager.class},
+        new Object[] {conf, this});
+    this.recoveryStorage.init();
+    LOGGER.info("Using RecoveryStorage: " + 
this.recoveryStorage.getClass().getName());
+
+    this.lifecycleManager = 
ReflectionUtils.createClazzInstance(conf.getLifecycleManagerClass(),
+        new Class[] {ZeppelinConfiguration.class},
+        new Object[] {conf});
+    LOGGER.info("Using LifecycleManager: " + 
this.lifecycleManager.getClass().getName());
 
     init();
   }
@@ -174,6 +183,7 @@ public class InterpreterSettingManager {
         .setAppEventListener(appEventListener)
         .setDependencyResolver(dependencyResolver)
         .setLifecycleManager(lifecycleManager)
+        .setRecoveryStorage(recoveryStorage)
         .postProcessing();
   }
 
@@ -307,8 +317,16 @@ public class InterpreterSettingManager {
     saveToFile();
   }
 
+  public RemoteInterpreterProcessListener 
getRemoteInterpreterProcessListener() {
+    return remoteInterpreterProcessListener;
+  }
+
+  public ApplicationEventListener getAppEventListener() {
+    return appEventListener;
+  }
+
   private boolean registerInterpreterFromResource(ClassLoader cl, String 
interpreterDir,
-      String interpreterJson) throws IOException {
+                                                  String interpreterJson) 
throws IOException {
     URL[] urls = recursiveBuildLibList(new File(interpreterDir));
     ClassLoader tempClassLoader = new URLClassLoader(urls, null);
 
@@ -507,6 +525,10 @@ public class InterpreterSettingManager {
     return resourceSet;
   }
 
+  public RecoveryStorage getRecoveryStorage() {
+    return recoveryStorage;
+  }
+
   public void removeResourcesBelongsToParagraph(String noteId, String 
paragraphId) {
     for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
       ResourceSet resourceSet = new ResourceSet();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index 2378f14..641c0ac 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -55,15 +55,31 @@ public class ManagedInterpreterGroup extends 
InterpreterGroup {
     return interpreterSetting;
   }
 
-  public synchronized RemoteInterpreterProcess 
getOrCreateInterpreterProcess(Properties properties)
+  public synchronized RemoteInterpreterProcess 
getOrCreateInterpreterProcess(String userName,
+                                                                             
Properties properties)
       throws IOException {
     if (remoteInterpreterProcess == null) {
       LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + 
getId());
-      remoteInterpreterProcess = 
interpreterSetting.createInterpreterProcess(properties);
+      remoteInterpreterProcess = 
interpreterSetting.createInterpreterProcess(id, properties);
+      synchronized (remoteInterpreterProcess) {
+        if (!remoteInterpreterProcess.isRunning()) {
+          remoteInterpreterProcess.start(userName, false);
+          remoteInterpreterProcess.getRemoteInterpreterEventPoller()
+              .setInterpreterProcess(remoteInterpreterProcess);
+          
remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
+          remoteInterpreterProcess.getRemoteInterpreterEventPoller().start();
+          getInterpreterSetting().getRecoveryStorage()
+              .onInterpreterClientStart(remoteInterpreterProcess);
+        }
+      }
     }
     return remoteInterpreterProcess;
   }
 
+  public RemoteInterpreterProcess getInterpreterProcess() {
+    return remoteInterpreterProcess;
+  }
+
   public RemoteInterpreterProcess getRemoteInterpreterProcess() {
     return remoteInterpreterProcess;
   }
@@ -94,6 +110,11 @@ public class ManagedInterpreterGroup extends 
InterpreterGroup {
       if (remoteInterpreterProcess != null) {
         LOGGER.info("Kill RemoteInterpreterProcess");
         remoteInterpreterProcess.stop();
+        try {
+          
interpreterSetting.getRecoveryStorage().onInterpreterClientStop(remoteInterpreterProcess);
+        } catch (IOException e) {
+          LOGGER.error("Fail to store recovery data", e);
+        }
         remoteInterpreterProcess = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
index 8c86129..6ddcacf 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
@@ -21,50 +21,68 @@ package org.apache.zeppelin.interpreter.launcher;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  * Interpreter Launcher which use shell script to launch the interpreter 
process.
- *
  */
 public class ShellScriptLauncher extends InterpreterLauncher {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ShellScriptLauncher.class);
 
-  public ShellScriptLauncher(ZeppelinConfiguration zConf) {
-    super(zConf);
+  public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
+    super(zConf, recoveryStorage);
   }
 
   @Override
-  public InterpreterClient launch(InterpreterLaunchContext context) {
+  public InterpreterClient launch(InterpreterLaunchContext context) throws 
IOException {
     LOGGER.info("Launching Interpreter: " + 
context.getInterpreterSettingGroup());
     this.properties = context.getProperties();
     InterpreterOption option = context.getOption();
     InterpreterRunner runner = context.getRunner();
     String groupName = context.getInterpreterSettingGroup();
     String name = context.getInterpreterSettingName();
-
     int connectTimeout =
         
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+
     if (option.isExistingProcess()) {
       return new RemoteInterpreterRunningProcess(
+          context.getInterpreterSettingName(),
           connectTimeout,
           option.getHost(),
           option.getPort());
     } else {
+      // try to recover it first
+      if (zConf.isRecoveryEnabled()) {
+        InterpreterClient recoveredClient =
+            
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
+        if (recoveredClient != null) {
+          if (recoveredClient.isRunning()) {
+            LOGGER.info("Recover interpreter process: " + 
recoveredClient.getHost() + ":" +
+                recoveredClient.getPort());
+            return recoveredClient;
+          } else {
+            LOGGER.warn("Cannot recover interpreter process: " + 
recoveredClient.getHost() + ":"
+                + recoveredClient.getPort() + ", as it is already 
terminated.");
+          }
+        }
+      }
+
       // create new remote process
       String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
           + context.getInterpreterSettingId();
       return new RemoteInterpreterManagedProcess(
           runner != null ? runner.getPath() : 
zConf.getInterpreterRemoteRunnerPath(),
-          zConf.getCallbackPortRange(),  zConf.getInterpreterPortRange(),
+          zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
           zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
           buildEnvFromProperties(), connectTimeout, name);
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 32a0530..e8a9cdf 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +36,8 @@ public class SparkInterpreterLauncher extends 
ShellScriptLauncher {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkInterpreterLauncher.class);
 
-  public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
-    super(zConf);
+  public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage 
recoveryStorage) {
+    super(zConf, recoveryStorage);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
new file mode 100644
index 0000000..5a0c8ad
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.recovery;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.notebook.FileSystemStorage;
+import org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Hadoop compatible FileSystem based RecoveryStorage implementation.
+ *
+ * Save InterpreterProcess in the format of:
+ * InterpreterGroupId host:port
+ */
+public class FileSystemRecoveryStorage extends RecoveryStorage {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
+
+  private InterpreterSettingManager interpreterSettingManager;
+  private FileSystemStorage fs;
+  private Path recoveryDir;
+
+  public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
+                                   InterpreterSettingManager 
interpreterSettingManager)
+      throws IOException {
+    super(zConf);
+    this.interpreterSettingManager = interpreterSettingManager;
+    this.zConf = zConf;
+    this.fs = FileSystemStorage.get(zConf);
+    this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir()));
+    LOGGER.info("Using folder {} to store recovery data", recoveryDir);
+    this.fs.tryMkDir(recoveryDir);
+  }
+
+  @Override
+  public void onInterpreterClientStart(InterpreterClient client) throws 
IOException {
+    save(client.getInterpreterSettingName());
+  }
+
+  @Override
+  public void onInterpreterClientStop(InterpreterClient client) throws 
IOException {
+    save(client.getInterpreterSettingName());
+  }
+
+  private void save(String interpreterSettingName) throws IOException {
+    InterpreterSetting interpreterSetting =
+        
interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
+    List<String> recoveryContent = new ArrayList<>();
+    for (ManagedInterpreterGroup interpreterGroup : 
interpreterSetting.getAllInterpreterGroups()) {
+      RemoteInterpreterProcess interpreterProcess = 
interpreterGroup.getInterpreterProcess();
+      if (interpreterProcess != null) {
+        recoveryContent.add(interpreterGroup.getId() + "\t" + 
interpreterProcess.getHost() + ":" +
+            interpreterProcess.getPort());
+      }
+    }
+    LOGGER.debug("Updating recovery data for interpreterSetting: " + 
interpreterSettingName);
+    LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, 
System.lineSeparator()));
+    Path recoveryFile = new Path(recoveryDir, interpreterSettingName + 
".recovery");
+    fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), 
recoveryFile, true);
+  }
+
+  @Override
+  public Map<String, InterpreterClient> restore() throws IOException {
+    Map<String, InterpreterClient> clients = new HashMap<>();
+    List<Path> paths = fs.list(new Path(recoveryDir + "/*.recovery"));
+
+    for (Path path : paths) {
+      String fileName = path.getName();
+      String interpreterSettingName = fileName.substring(0,
+          fileName.length() - ".recovery".length());
+      String recoveryContent = fs.readFile(path);
+      if (!StringUtils.isBlank(recoveryContent)) {
+        for (String line : recoveryContent.split(System.lineSeparator())) {
+          String[] tokens = line.split("\t");
+          String groupId = tokens[0];
+          String[] hostPort = tokens[1].split(":");
+          int connectTimeout =
+              
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+          RemoteInterpreterRunningProcess client = new 
RemoteInterpreterRunningProcess(
+              interpreterSettingName, connectTimeout, hostPort[0], 
Integer.parseInt(hostPort[1]));
+          // interpreterSettingManager may be null when this class is used 
when it is used
+          // stop-interpreter.sh
+          if (interpreterSettingManager != null) {
+            client.setRemoteInterpreterEventPoller(new 
RemoteInterpreterEventPoller(
+                
interpreterSettingManager.getRemoteInterpreterProcessListener(),
+                interpreterSettingManager.getAppEventListener()));
+          }
+          clients.put(groupId, client);
+          LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + 
hostPort[1]);
+        }
+      }
+    }
+
+    return clients;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
new file mode 100644
index 0000000..3a7d12c
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.recovery;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+/**
+ * RecoveryStorage that do nothing, used when recovery is not enabled.
+ *
+ */
+public class NullRecoveryStorage extends RecoveryStorage {
+
+  public NullRecoveryStorage(ZeppelinConfiguration zConf,
+                             InterpreterSettingManager 
interpreterSettingManager)
+      throws IOException {
+    super(zConf);
+  }
+
+  @Override
+  public void onInterpreterClientStart(InterpreterClient client) throws 
IOException {
+
+  }
+
+  @Override
+  public void onInterpreterClientStop(InterpreterClient client) throws 
IOException {
+
+  }
+
+  @Override
+  public Map<String, InterpreterClient> restore() throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
new file mode 100644
index 0000000..d74b162
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java
@@ -0,0 +1,40 @@
+package org.apache.zeppelin.interpreter.recovery;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+import org.apache.zeppelin.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+
+/**
+ * Utility class for stopping interpreter in the case that you want to stop 
all the
+ * interpreter process even when you enable recovery, or you want to kill 
interpreter process
+ * to avoid orphan process.
+ */
+public class StopInterpreter {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(StopInterpreter.class);
+
+  public static void main(String[] args) throws IOException {
+    ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
+    RecoveryStorage recoveryStorage = null;
+
+    recoveryStorage = 
ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(),
+        new Class[] {ZeppelinConfiguration.class, 
InterpreterSettingManager.class},
+        new Object[] {zConf, null});
+
+    LOGGER.info("Using RecoveryStorage: " + 
recoveryStorage.getClass().getName());
+    Map<String, InterpreterClient> restoredClients = recoveryStorage.restore();
+    if (restoredClients != null) {
+      for (InterpreterClient client : restoredClients.values()) {
+        LOGGER.info("Stop Interpreter Process: " + client.getHost() + ":" + 
client.getPort());
+        client.stop();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 6defd9b..bda8010 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -102,16 +102,7 @@ public class RemoteInterpreter extends Interpreter {
       return this.interpreterProcess;
     }
     ManagedInterpreterGroup intpGroup = getInterpreterGroup();
-    this.interpreterProcess = 
intpGroup.getOrCreateInterpreterProcess(properties);
-    synchronized (interpreterProcess) {
-      if (!interpreterProcess.isRunning()) {
-        interpreterProcess.start(this.getUserName(), false);
-        interpreterProcess.getRemoteInterpreterEventPoller()
-            .setInterpreterProcess(interpreterProcess);
-        
interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
-        interpreterProcess.getRemoteInterpreterEventPoller().start();
-      }
-    }
+    this.interpreterProcess = 
intpGroup.getOrCreateInterpreterProcess(getUserName(), properties);
     return interpreterProcess;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 27e826c..3dd5bfa 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -214,7 +214,7 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
       callbackServer.stop();
     }
     if (isRunning()) {
-      logger.info("kill interpreter process");
+      logger.info("Kill interpreter process");
       try {
         callRemoteFunction(new RemoteFunction<Void>() {
           @Override
@@ -263,7 +263,6 @@ public class RemoteInterpreterManagedProcess extends 
RemoteInterpreterProcess
     return interpreterDir;
   }
 
-  @VisibleForTesting
   public String getInterpreterSettingName() {
     return interpreterSettingName;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 88cc489..08653ae 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -51,12 +51,6 @@ public abstract class RemoteInterpreterProcess implements 
InterpreterClient {
     this.remoteInterpreterEventPoller = eventPoller;
   }
 
-  public abstract String getHost();
-  public abstract int getPort();
-  public abstract void start(String userName, Boolean isUserImpersonate);
-  public abstract void stop();
-  public abstract boolean isRunning();
-
   public int getConnectTimeout() {
     return connectTimeout;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index d8715a0..0e87e4f 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -17,6 +17,7 @@
 package org.apache.zeppelin.interpreter.remote;
 
 import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,13 +28,16 @@ public class RemoteInterpreterRunningProcess extends 
RemoteInterpreterProcess {
   private final Logger logger = 
LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
   private final String host;
   private final int port;
+  private final String interpreterSettingName;
 
   public RemoteInterpreterRunningProcess(
+      String interpreterSettingName,
       int connectTimeout,
       String host,
       int port
   ) {
     super(connectTimeout);
+    this.interpreterSettingName = interpreterSettingName;
     this.host = host;
     this.port = port;
   }
@@ -49,13 +53,35 @@ public class RemoteInterpreterRunningProcess extends 
RemoteInterpreterProcess {
   }
 
   @Override
+  public String getInterpreterSettingName() {
+    return interpreterSettingName;
+  }
+
+  @Override
   public void start(String userName, Boolean isUserImpersonate) {
     // assume process is externally managed. nothing to do
   }
 
   @Override
   public void stop() {
-    // assume process is externally managed. nothing to do
+    // assume process is externally managed. nothing to do. But will kill it
+    // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
+    if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
+      if (isRunning()) {
+        logger.info("Kill interpreter process");
+        try {
+          callRemoteFunction(new RemoteFunction<Void>() {
+            @Override
+            public Void call(RemoteInterpreterService.Client client) throws 
Exception {
+              client.shutdown();
+              return null;
+            }
+          });
+        } catch (Exception e) {
+          logger.warn("ignore the exception when shutting down interpreter 
process.", e);
+        }
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
new file mode 100644
index 0000000..6f3d3f9
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/FileSystemStorage.java
@@ -0,0 +1,168 @@
+package org.apache.zeppelin.notebook;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Hadoop FileSystem wrapper. Support both secure and no-secure mode
+ */
+public class FileSystemStorage {
+
+  private static Logger LOGGER = 
LoggerFactory.getLogger(FileSystemStorage.class);
+
+  private static FileSystemStorage instance;
+
+  private ZeppelinConfiguration zConf;
+  private Configuration hadoopConf;
+  private boolean isSecurityEnabled = false;
+  private FileSystem fs;
+
+  private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException {
+    this.zConf = zConf;
+    this.hadoopConf = new Configuration();
+    this.hadoopConf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+
+    if (isSecurityEnabled) {
+      String keytab = zConf.getString(
+          ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
+      String principal = zConf.getString(
+          ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
+      if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
+        throw new IOException("keytab and principal can not be empty, keytab: 
" + keytab
+            + ", principal: " + principal);
+      }
+      UserGroupInformation.loginUserFromKeytab(principal, keytab);
+    }
+
+    try {
+      this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), 
this.hadoopConf);
+      LOGGER.info("Creating FileSystem: " + 
this.fs.getClass().getCanonicalName());
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static synchronized FileSystemStorage get(ZeppelinConfiguration 
zConf) throws IOException {
+    if (instance == null) {
+      instance = new FileSystemStorage(zConf);
+    }
+    return instance;
+  }
+
+  public Path makeQualified(Path path) {
+    return fs.makeQualified(path);
+  }
+
+  public void tryMkDir(final Path dir) throws IOException {
+    callHdfsOperation(new HdfsOperation<Void>() {
+      @Override
+      public Void call() throws IOException {
+        if (!fs.exists(dir)) {
+          fs.mkdirs(dir);
+          LOGGER.info("Create dir {} in hdfs", dir.toString());
+        }
+        if (fs.isFile(dir)) {
+          throw new IOException(dir.toString() + " is file instead of 
directory, please remove " +
+              "it or specify another directory");
+        }
+        fs.mkdirs(dir);
+        return null;
+      }
+    });
+  }
+
+  public List<Path> list(final Path path) throws IOException {
+    return callHdfsOperation(new HdfsOperation<List<Path>>() {
+      @Override
+      public List<Path> call() throws IOException {
+        List<Path> paths = new ArrayList<>();
+        for (FileStatus status : fs.globStatus(path)) {
+          paths.add(status.getPath());
+        }
+        return paths;
+      }
+    });
+  }
+
+  public boolean delete(final Path path) throws IOException {
+    return callHdfsOperation(new HdfsOperation<Boolean>() {
+      @Override
+      public Boolean call() throws IOException {
+        return fs.delete(path, true);
+      }
+    });
+  }
+
+  public String readFile(final Path file) throws IOException {
+    return callHdfsOperation(new HdfsOperation<String>() {
+      @Override
+      public String call() throws IOException {
+        LOGGER.debug("Read from file: " + file);
+        ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
+        IOUtils.copyBytes(fs.open(file), noteBytes, hadoopConf);
+        return new String(noteBytes.toString(
+            
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
+      }
+    });
+  }
+
+  public void writeFile(final String content, final Path file, boolean 
writeTempFileFirst)
+      throws IOException {
+    callHdfsOperation(new HdfsOperation<Void>() {
+      @Override
+      public Void call() throws IOException {
+        InputStream in = new ByteArrayInputStream(content.getBytes(
+            
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
+        Path tmpFile = new Path(file.toString() + ".tmp");
+        IOUtils.copyBytes(in, fs.create(tmpFile), hadoopConf);
+        fs.delete(file, true);
+        fs.rename(tmpFile, file);
+        return null;
+      }
+    });
+  }
+
+  private interface HdfsOperation<T> {
+    T call() throws IOException;
+  }
+
+  public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) 
throws IOException {
+    if (isSecurityEnabled) {
+      UserGroupInformation.getLoginUser().reloginFromKeytab();
+      try {
+        return UserGroupInformation.getCurrentUser().doAs(new 
PrivilegedExceptionAction<T>() {
+          @Override
+          public T run() throws Exception {
+            return func.call();
+          }
+        });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    } else {
+      return func.call();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java
index ba858e6..d8ec0e5 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/FileSystemNotebookRepo.java
@@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.notebook.FileSystemStorage;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -37,108 +38,45 @@ import java.util.Map;
 public class FileSystemNotebookRepo implements NotebookRepo {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileSystemNotebookRepo.class);
 
-  private Configuration hadoopConf;
-  private ZeppelinConfiguration zConf;
-  private boolean isSecurityEnabled = false;
-  private FileSystem fs;
+  private FileSystemStorage fs;
   private Path notebookDir;
 
   public FileSystemNotebookRepo(ZeppelinConfiguration zConf) throws 
IOException {
-    this.zConf = zConf;
-    this.hadoopConf = new Configuration();
-
-    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
-    if (isSecurityEnabled) {
-      String keytab = zConf.getString(
-          ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
-      String principal = zConf.getString(
-          ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
-      if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
-        throw new IOException("keytab and principal can not be empty, keytab: 
" + keytab
-            + ", principal: " + principal);
-      }
-      UserGroupInformation.loginUserFromKeytab(principal, keytab);
-    }
+    this.fs = FileSystemStorage.get(zConf);
+    this.notebookDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
+    LOGGER.info("Using folder {} to store notebook", notebookDir);
+    this.fs.tryMkDir(notebookDir);
 
-    try {
-      this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new 
Configuration());
-      LOGGER.info("Creating FileSystem: " + 
this.fs.getClass().getCanonicalName());
-      this.notebookDir = fs.makeQualified(new Path(zConf.getNotebookDir()));
-      LOGGER.info("Using folder {} to store notebook", notebookDir);
-    } catch (URISyntaxException e) {
-      throw new IOException(e);
-    }
-    if (!fs.exists(notebookDir)) {
-      fs.mkdirs(notebookDir);
-      LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
-    }
-    if (fs.isFile(notebookDir)) {
-      throw new IOException("notebookDir {} is file instead of directory, 
please remove it or " +
-          "specify another directory");
-    }
   }
 
   @Override
   public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
-    return callHdfsOperation(new HdfsOperation<List<NoteInfo>>() {
-      @Override
-      public List<NoteInfo> call() throws IOException {
-        List<NoteInfo> noteInfos = new ArrayList<>();
-        for (FileStatus status : fs.globStatus(new Path(notebookDir, 
"*/note.json"))) {
-          NoteInfo noteInfo = new 
NoteInfo(status.getPath().getParent().getName(), "", null);
-          noteInfos.add(noteInfo);
-        }
-        return noteInfos;
-      }
-    });
+    List<Path> notePaths = fs.list(new Path(notebookDir, "*/note.json"));
+    List<NoteInfo> noteInfos = new ArrayList<>();
+    for (Path path : notePaths) {
+      NoteInfo noteInfo = new NoteInfo(path.getParent().getName(), "", null);
+      noteInfos.add(noteInfo);
+    }
+    return noteInfos;
   }
 
   @Override
   public Note get(final String noteId, AuthenticationInfo subject) throws 
IOException {
-    return callHdfsOperation(new HdfsOperation<Note>() {
-      @Override
-      public Note call() throws IOException {
-        Path notePath = new Path(notebookDir.toString() + "/" + noteId + 
"/note.json");
-        LOGGER.debug("Read note from file: " + notePath);
-        ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
-        IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf);
-        return Note.fromJson(new String(noteBytes.toString(
-            
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))));
-      }
-    });
+    String content = this.fs.readFile(
+        new Path(notebookDir.toString() + "/" + noteId + "/note.json"));
+    return Note.fromJson(content);
   }
 
   @Override
   public void save(final Note note, AuthenticationInfo subject) throws 
IOException {
-    callHdfsOperation(new HdfsOperation<Void>() {
-      @Override
-      public Void call() throws IOException {
-        Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + 
"/note.json");
-        Path tmpNotePath = new Path(notebookDir.toString() + "/" + 
note.getId() + "/.note.json");
-        LOGGER.debug("Saving note to file: " + notePath);
-        if (fs.exists(tmpNotePath)) {
-          fs.delete(tmpNotePath, true);
-        }
-        InputStream in = new ByteArrayInputStream(note.toJson().getBytes(
-            
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
-        IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf);
-        fs.delete(notePath, true);
-        fs.rename(tmpNotePath, notePath);
-        return null;
-      }
-    });
+    this.fs.writeFile(note.toJson(),
+        new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"),
+        true);
   }
 
   @Override
   public void remove(final String noteId, AuthenticationInfo subject) throws 
IOException {
-    callHdfsOperation(new HdfsOperation<Void>() {
-      @Override
-      public Void call() throws IOException {
-        Path noteFolder = new Path(notebookDir.toString() + "/" + noteId);
-        fs.delete(noteFolder, true);
-        return null;
-      }
-    });
+    this.fs.delete(new Path(notebookDir.toString() + "/" + noteId));
   }
 
   @Override
@@ -182,26 +120,4 @@ public class FileSystemNotebookRepo implements 
NotebookRepo {
   public void updateSettings(Map<String, String> settings, AuthenticationInfo 
subject) {
     LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo");
   }
-
-  private interface HdfsOperation<T> {
-    T call() throws IOException;
-  }
-
-  public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) 
throws IOException {
-    if (isSecurityEnabled) {
-      UserGroupInformation.getLoginUser().reloginFromKeytab();
-      try {
-        return UserGroupInformation.getCurrentUser().doAs(new 
PrivilegedExceptionAction<T>() {
-          @Override
-          public T run() throws Exception {
-            return func.call();
-          }
-        });
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-    } else {
-      return func.call();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java
new file mode 100644
index 0000000..ca09992
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/ReflectionUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.util;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+
+/**
+ * Utility class for creating instances via java reflection.
+ *
+ */
+public class ReflectionUtils {
+
+  public static Class<?> getClazz(String className) throws IOException {
+    Class clazz = null;
+    try {
+      clazz = Class.forName(className, true, 
Thread.currentThread().getContextClassLoader());
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to load class: " + className, e);
+    }
+
+    return clazz;
+  }
+
+  private static <T> T getNewInstance(Class<T> clazz) throws IOException {
+    T instance;
+    try {
+      instance = clazz.newInstance();
+    } catch (InstantiationException e) {
+      throw new IOException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), 
e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(
+          "Unable to instantiate class with 0 arguments: " + clazz.getName(), 
e);
+    }
+    return instance;
+  }
+
+  private static <T> T getNewInstance(Class<T> clazz,
+                                      Class<?>[] parameterTypes,
+                                      Object[] parameters)
+      throws IOException {
+    T instance;
+    try {
+      Constructor<T> constructor = clazz.getConstructor(parameterTypes);
+      instance = constructor.newInstance(parameters);
+    } catch (InstantiationException e) {
+      throw new IOException(
+          "Unable to instantiate class with " + parameters.length + " 
arguments: " +
+              clazz.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(
+          "Unable to instantiate class with " + parameters.length + " 
arguments: " +
+              clazz.getName(), e);
+    } catch (NoSuchMethodException e) {
+      throw new IOException(
+          "Unable to instantiate class with " + parameters.length + " 
arguments: " +
+              clazz.getName(), e);
+    } catch (InvocationTargetException e) {
+      throw new IOException(
+          "Unable to instantiate class with " + parameters.length + " 
arguments: " +
+              clazz.getName(), e);
+    }
+    return instance;
+  }
+
+  public static <T> T createClazzInstance(String className) throws IOException 
{
+    Class<?> clazz = getClazz(className);
+    @SuppressWarnings("unchecked")
+    T instance = (T) getNewInstance(clazz);
+    return instance;
+  }
+
+  public static <T> T createClazzInstance(String className,
+                                          Class<?>[] parameterTypes,
+                                          Object[] parameters) throws 
IOException {
+    Class<?> clazz = getClazz(className);
+    T instance = (T) getNewInstance(clazz, parameterTypes, parameters);
+    return instance;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4c8f20ae/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
index 9df402d..16c8c1d 100644
--- 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
@@ -33,7 +33,7 @@ public abstract class AbstractInterpreterTest {
   protected File interpreterDir;
   protected File confDir;
   protected File notebookDir;
-  protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
+  protected ZeppelinConfiguration conf;
 
   @Before
   public void setUp() throws Exception {

Reply via email to