Repository: incubator-zeppelin
Updated Branches:
  refs/heads/branch-0.5 6ed67913d -> e6df65771


ZEPPELIN-79 Zeppelin does not kill some interpreters when server is stopped

https://issues.apache.org/jira/browse/ZEPPELIN-79

Zeppelin sometimes left interpreter process after it is stopped.
This pr solve the problem by increase timeout for graceful shutdown

Author: Lee moon soo <[email protected]>

Closes #135 from Leemoonsoo/ZEPPELIN-79 and squashes the following commits:

d2b1fa6 [Lee moon soo] Close and destroy interpreters in parallel
4558417 [Lee moon soo] Increase graceful shutdown timeout

(cherry picked from commit 12e5abf2803e4c5015998672b10642fc72aac0da)
Signed-off-by: Lee moon soo <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/e6df6577
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/e6df6577
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/e6df6577

Branch: refs/heads/branch-0.5
Commit: e6df657714ada918fe345048d87acd04b80af28a
Parents: 6ed6791
Author: Lee moon soo <[email protected]>
Authored: Thu Jul 2 12:05:23 2015 -0700
Committer: Lee moon soo <[email protected]>
Committed: Sun Jul 5 10:45:11 2015 -0700

----------------------------------------------------------------------
 bin/zeppelin-daemon.sh                          | 20 ++++++---
 .../zeppelin/interpreter/InterpreterGroup.java  | 46 ++++++++++++++++++--
 .../remote/RemoteInterpreterProcess.java        |  4 +-
 .../interpreter/InterpreterFactory.java         | 25 ++++++++---
 4 files changed, 77 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/bin/zeppelin-daemon.sh
----------------------------------------------------------------------
diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh
index 2440c12..a386f27 100755
--- a/bin/zeppelin-daemon.sh
+++ b/bin/zeppelin-daemon.sh
@@ -101,19 +101,27 @@ function wait_for_zeppelin_to_die() {
   local pid
   local count
   pid=$1
+  timeout=$2
   count=0
-  while [[ "${count}" -lt 10 ]]; do
+  timeoutTime=$(date "+%s")
+  let "timeoutTime+=$timeout"
+  currentTime=$(date "+%s")
+  forceKill=1
+
+  while [[ $currentTime -lt $timeoutTime ]]; do
     $(kill ${pid} > /dev/null 2> /dev/null)
     if kill -0 ${pid} > /dev/null 2>&1; then
       sleep 3
-      let "count+=1"
     else
+      forceKill=0
       break
     fi
-  if [[ "${count}" == "5" ]]; then
+    currentTime=$(date "+%s")
+  done
+
+  if [[ forceKill -ne 0 ]]; then
     $(kill -9 ${pid} > /dev/null 2> /dev/null)
   fi
-  done
 }
 
 function wait_zeppelin_is_up_for_ci() {
@@ -187,7 +195,7 @@ function stop() {
     if [[ -z "${pid}" ]]; then
       echo "${ZEPPELIN_NAME} is not running"
     else
-      wait_for_zeppelin_to_die $pid
+      wait_for_zeppelin_to_die $pid 40
       $(rm -f ${ZEPPELIN_PID})
       action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
     fi
@@ -200,7 +208,7 @@ function stop() {
     fi
 
     pid=$(cat ${f})
-    wait_for_zeppelin_to_die $pid
+    wait_for_zeppelin_to_die $pid 20
     $(rm -f ${f})
   done
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 9baaef3..216663a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -18,9 +18,11 @@
 package org.apache.zeppelin.interpreter;
 
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 
 /**
@@ -71,14 +73,50 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
   }
 
   public void close() {
-    for (Interpreter intp : this) {
-      intp.close();
+    List<Thread> closeThreads = new LinkedList<Thread>();
+
+    for (final Interpreter intp : this) {
+      Thread t = new Thread() {
+        public void run() {
+          intp.close();
+        }
+      };
+
+      t.start();
+      closeThreads.add(t);
+    }
+
+    for (Thread t : closeThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        Logger logger = Logger.getLogger(InterpreterGroup.class);
+        logger.error("Can't close interpreter", e);
+      }
     }
   }
 
   public void destroy() {
-    for (Interpreter intp : this) {
-      intp.destroy();
+    List<Thread> destroyThreads = new LinkedList<Thread>();
+
+    for (final Interpreter intp : this) {
+      Thread t = new Thread() {
+        public void run() {
+          intp.destroy();
+        }
+      };
+
+      t.start();
+      destroyThreads.add(t);
+    }
+
+    for (Thread t : destroyThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        Logger logger = Logger.getLogger(InterpreterGroup.class);
+        logger.error("Can't close interpreter", e);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index f917eb9..91edd41 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -163,10 +163,10 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
         clientPool.clear();
         clientPool.close();
 
-        // wait for 3 sec and force kill
+        // wait for some time (connectTimeout) and force kill
         // remote process server.serve() loop is not always finishing 
gracefully
         long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime < 3 * 1000) {
+        while (System.currentTimeMillis() - startTime < connectTimeout) {
           if (this.isRunning()) {
             try {
               Thread.sleep(500);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 77df7c5..57e2b7a 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -546,13 +546,26 @@ public class InterpreterFactory {
 
 
   public void close() {
+    List<Thread> closeThreads = new LinkedList<Thread>();
     synchronized (interpreterSettings) {
-      synchronized (interpreterSettings) {
-        Collection<InterpreterSetting> intpsettings = 
interpreterSettings.values();
-        for (InterpreterSetting intpsetting : intpsettings) {
-          intpsetting.getInterpreterGroup().close();
-          intpsetting.getInterpreterGroup().destroy();
-        }
+      Collection<InterpreterSetting> intpsettings = 
interpreterSettings.values();
+      for (final InterpreterSetting intpsetting : intpsettings) {
+        Thread t = new Thread() {
+          public void run() {
+            intpsetting.getInterpreterGroup().close();
+            intpsetting.getInterpreterGroup().destroy();
+          }
+        };
+        t.start();
+        closeThreads.add(t);
+      }
+    }
+
+    for (Thread t : closeThreads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        logger.error("Can't close interpreterGroup", e);
       }
     }
   }

Reply via email to