Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 0a68c0b11 -> f8aeee11e


[ZEPPELIN-535] "Scheduler already terminated" occurs when 
RemoteInterpreter.close() doesn't succeed

### What is this PR for?
Fix the exception "Scheduler already terminated" when remove interpreter 
close() fails

### What type of PR is it?
Bug Fix

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-535

### How should this be tested?
Modify any interpreter to throw exception on close() call.
And try to use it after restart the interpreter.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: Lee moon soo <[email protected]>

Closes #574 from Leemoonsoo/ZEPPELIN-535 and squashes the following commits:

66d7c09 [Lee moon soo] Remove unnecessary check in test
9b7e8d5 [Lee moon soo] Remove interpreterGroupReference


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f8aeee11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f8aeee11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f8aeee11

Branch: refs/heads/master
Commit: f8aeee11e3bb447ee392cfb004b0acbe2e6bc435
Parents: 0a68c0b
Author: Lee moon soo <[email protected]>
Authored: Sun Dec 27 19:55:28 2015 -0800
Committer: Lee moon soo <[email protected]>
Committed: Mon Dec 28 17:34:54 2015 -0800

----------------------------------------------------------------------
 .../zeppelin/interpreter/InterpreterGroup.java  | 17 ++++
 .../interpreter/remote/RemoteInterpreter.java   | 81 +++++---------------
 .../remote/RemoteInterpreterTest.java           |  6 --
 3 files changed, 36 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f8aeee11/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 216663a..9256bcd 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -24,6 +24,7 @@ import java.util.Random;
 
 import org.apache.log4j.Logger;
 import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 
 /**
  * InterpreterGroup is list of interpreters in the same group.
@@ -33,6 +34,7 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
   String id;
 
   AngularObjectRegistry angularObjectRegistry;
+  RemoteInterpreterProcess remoteInterpreterProcess;    // attached remote 
interpreter process
 
   public InterpreterGroup(String id) {
     this.id = id;
@@ -72,6 +74,14 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
     this.angularObjectRegistry = angularObjectRegistry;
   }
 
+  public RemoteInterpreterProcess getRemoteInterpreterProcess() {
+    return remoteInterpreterProcess;
+  }
+
+  public void setRemoteInterpreterProcess(RemoteInterpreterProcess 
remoteInterpreterProcess) {
+    this.remoteInterpreterProcess = remoteInterpreterProcess;
+  }
+
   public void close() {
     List<Thread> closeThreads = new LinkedList<Thread>();
 
@@ -118,5 +128,12 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
         logger.error("Can't close interpreter", e);
       }
     }
+
+    // make sure remote interpreter process terminates
+    if (remoteInterpreterProcess != null) {
+      while (remoteInterpreterProcess.referenceCount() > 0) {
+        remoteInterpreterProcess.dereference();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f8aeee11/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 3ac5121..c72aa7c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -56,8 +56,6 @@ public class RemoteInterpreter extends Interpreter {
   FormType formType;
   boolean initialized;
   private Map<String, String> env;
-  static Map<String, RemoteInterpreterProcess> interpreterGroupReference
-    = new HashMap<String, RemoteInterpreterProcess>();
 
   private int connectTimeout;
 
@@ -96,19 +94,21 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   public RemoteInterpreterProcess getInterpreterProcess() {
-    synchronized (interpreterGroupReference) {
-      if 
(interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup())))
 {
-        RemoteInterpreterProcess interpreterProcess = interpreterGroupReference
-            .get(getInterpreterGroupKey(getInterpreterGroup()));
-        try {
-          return interpreterProcess;
-        } catch (Exception e) {
-          throw new InterpreterException(e);
-        }
-      } else {
-        // closed or not opened yet
-        return null;
+    InterpreterGroup intpGroup = getInterpreterGroup();
+    if (intpGroup == null) {
+      return null;
+    }
+
+    synchronized (intpGroup) {
+      if (intpGroup.getRemoteInterpreterProcess() == null) {
+        // create new remote process
+        RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
+                interpreterRunner, interpreterPath, env, connectTimeout);
+
+        intpGroup.setRemoteInterpreterProcess(remoteProcess);
       }
+
+      return intpGroup.getRemoteInterpreterProcess();
     }
   }
 
@@ -117,17 +117,7 @@ public class RemoteInterpreter extends Interpreter {
       return;
     }
 
-    RemoteInterpreterProcess interpreterProcess = null;
-
-    synchronized (interpreterGroupReference) {
-      if 
(interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup())))
 {
-        interpreterProcess = interpreterGroupReference
-            .get(getInterpreterGroupKey(getInterpreterGroup()));
-      } else {
-        throw new InterpreterException("Unexpected error");
-      }
-    }
-
+    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     int rc = interpreterProcess.reference(getInterpreterGroup());
 
     synchronized (interpreterProcess) {
@@ -170,24 +160,14 @@ public class RemoteInterpreter extends Interpreter {
     Client client = null;
     try {
       client = interpreterProcess.getClient();
+      client.close(className);
     } catch (Exception e1) {
       throw new InterpreterException(e1);
-    }
-
-    try {
-      client.close(className);
-    } catch (TException e) {
-      throw new InterpreterException(e);
     } finally {
-      interpreterProcess.releaseClient(client);
-    }
-
-    int r = interpreterProcess.dereference();
-    if (r == 0) {
-      synchronized (interpreterGroupReference) {
-        InterpreterGroup intpGroup = getInterpreterGroup();
-        interpreterGroupReference.remove(getInterpreterGroupKey(intpGroup));
+      if (client != null) {
+        interpreterProcess.releaseClient(client);
       }
+      getInterpreterProcess().dereference();
     }
   }
 
@@ -339,29 +319,6 @@ public class RemoteInterpreter extends Interpreter {
     }
   }
 
-
-  @Override
-  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
-    super.setInterpreterGroup(interpreterGroup);
-
-    synchronized (interpreterGroupReference) {
-      RemoteInterpreterProcess intpProcess = interpreterGroupReference
-          .get(getInterpreterGroupKey(interpreterGroup));
-
-      // when interpreter process is not created or terminated
-      if (intpProcess == null || (!intpProcess.isRunning() && 
intpProcess.getPort() > 0)
-          || (!intpProcess.isRunning() && intpProcess.getPort() == -1)) {
-        interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup),
-            new RemoteInterpreterProcess(interpreterRunner,
-                interpreterPath, env, connectTimeout));
-
-        logger.info("setInterpreterGroup = "
-            + getInterpreterGroupKey(interpreterGroup) + " class=" + className
-            + ", path=" + interpreterPath);
-      }
-    }
-  }
-
   private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
     return interpreterGroup.getId();
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f8aeee11/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
index bbda252..c938ff3 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java
@@ -219,9 +219,6 @@ public class RemoteInterpreterTest {
 
     intpA.close();
     intpB.close();
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    assertNull(process);
   }
 
   @Test
@@ -337,9 +334,6 @@ public class RemoteInterpreterTest {
 
     intpA.close();
     intpB.close();
-
-    RemoteInterpreterProcess process = intpA.getInterpreterProcess();
-    assertNull(process);
   }
 
   @Test

Reply via email to