Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 3123d3de8 -> fc870893f


ZEPPELIN-534 Discard broken thrift Client instance

### What is this PR for?

Zeppelin has been reused broken thrift client instances.
Since we can catch TException, we can discard client instances which throws 
TException from client pool.

### What type of PR is it?
Bug Fix | Improvement

### Todos

### Is there a relevant Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-534

### How should this be tested?

1. run notebook which uses spark interpreter
2. kill spark interpreter with -9
3. run notebook which uses killed interpreter
4. run same notebook again and see error log has changed

output of 3
```
java.net.SocketException: Connection reset at 
java.net.SocketInputStream.read(SocketInputStream.java:196) at 
java.net.SocketInputStream.read(SocketInputStream.java:122) at 
java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334) at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
 at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at 
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at 
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at 
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
 at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at 
org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220)
 at org.apache.zeppelin.interpre
 
ter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:225)
 at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
 at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:211) at 
org.apache.zeppelin.scheduler.Job.run(Job.java:169) at 
org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:322)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at 
java.util.concurrent.FutureTask.run(FutureTask.java:262) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.r
 un(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
```

output of 4
```
java.net.ConnectException: Connection refused at 
java.net.PlainSocketImpl.socketConnect(Native Method) at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at 
java.net.Socket.connect(Socket.java:579) at 
org.apache.thrift.transport.TSocket.open(TSocket.java:182) at 
org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51)
 at 
org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37)
 at 
org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
 at 
org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
 at 
org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
 at org.apache.commons.pool2.impl.Gen
 ericObjectPool.borrowObject(GenericObjectPool.java:363) at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:140)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:205)
 at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
 at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:211) at 
org.apache.zeppelin.scheduler.Job.run(Job.java:169) at 
org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:322)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at 
java.util.concurrent.FutureTask.run(FutureTask.java:262) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(T
 hreadPoolExecutor.java:1145) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745)
```

Result could be different how many clients instances pool makes at initial 
phase.
Before applying this, output of 4 would be ```broken pipe```, which means it 
doesn't discard previous client instance.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? (No)
* Is there breaking changes for older versions? (No)
* Does this needs documentation? (No)

Author: Jungtaek Lim <[email protected]>

Closes #575 from HeartSaVioR/ZEPPELIN-534 and squashes the following commits:

c956333 [Jungtaek Lim] ZEPPELIN-534 return thrift client to Pool quietly
22724cf [Jungtaek Lim] ZEPPELIN-534 Discard broken thrift Client instance


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/fc870893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/fc870893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/fc870893

Branch: refs/heads/master
Commit: fc870893f0d2bae2116fb64270d455869268f317
Parents: 3123d3d
Author: Jungtaek Lim <[email protected]>
Authored: Tue Jan 5 12:55:28 2016 +0900
Committer: Alexander Bezzubov <[email protected]>
Committed: Tue Jan 5 18:20:47 2016 +0900

----------------------------------------------------------------------
 .../remote/RemoteAngularObjectRegistry.java     | 13 ++++++--
 .../interpreter/remote/RemoteInterpreter.java   | 31 +++++++++++++++-----
 .../remote/RemoteInterpreterEventPoller.java    |  6 ++--
 .../remote/RemoteInterpreterProcess.java        | 29 ++++++++++++++++--
 .../zeppelin/scheduler/RemoteScheduler.java     |  4 ++-
 5 files changed, 68 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
index b7ac014..a7ddf49 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote;
 
 import java.util.List;
 
+import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
@@ -77,15 +78,19 @@ public class RemoteAngularObjectRegistry extends 
AngularObjectRegistry {
     }
 
     Client client = null;
+    boolean broken = false;
     try {
       client = remoteInterpreterProcess.getClient();
       client.angularObjectAdd(name, noteId, gson.toJson(o));
       return super.add(name, o, noteId, true);
+    } catch (TException e) {
+      broken = true;
+      logger.error("Error", e);
     } catch (Exception e) {
       logger.error("Error", e);
     } finally {
       if (client != null) {
-        remoteInterpreterProcess.releaseClient(client);
+        remoteInterpreterProcess.releaseClient(client, broken);
       }
     }
     return null;
@@ -106,15 +111,19 @@ public class RemoteAngularObjectRegistry extends 
AngularObjectRegistry {
     }
 
     Client client = null;
+    boolean broken = false;
     try {
       client = remoteInterpreterProcess.getClient();
       client.angularObjectRemove(name, noteId);
       return super.remove(name, noteId);
+    } catch (TException e) {
+      broken = true;
+      logger.error("Error", e);
     } catch (Exception e) {
       logger.error("Error", e);
     } finally {
       if (client != null) {
-        remoteInterpreterProcess.releaseClient(client);
+        remoteInterpreterProcess.releaseClient(client, broken);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index c72aa7c..455156c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -131,6 +131,7 @@ public class RemoteInterpreter extends Interpreter {
           throw new InterpreterException(e1);
         }
 
+        boolean broken = false;
         try {
           for (Interpreter intp : this.getInterpreterGroup()) {
             logger.info("Create remote interpreter {}", intp.getClassName());
@@ -138,9 +139,10 @@ public class RemoteInterpreter extends Interpreter {
 
           }
         } catch (TException e) {
+          broken = true;
           throw new InterpreterException(e);
         } finally {
-          interpreterProcess.releaseClient(client);
+          interpreterProcess.releaseClient(client, broken);
         }
       }
     }
@@ -158,14 +160,19 @@ public class RemoteInterpreter extends Interpreter {
   public void close() {
     RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
     Client client = null;
+
+    boolean broken = false;
     try {
       client = interpreterProcess.getClient();
       client.close(className);
+    } catch (TException e) {
+      broken = true;
+      throw new InterpreterException(e);
     } catch (Exception e1) {
       throw new InterpreterException(e1);
     } finally {
       if (client != null) {
-        interpreterProcess.releaseClient(client);
+        interpreterProcess.releaseClient(client, broken);
       }
       getInterpreterProcess().dereference();
     }
@@ -195,6 +202,7 @@ public class RemoteInterpreter extends Interpreter {
       interpreterContextRunnerPool.addAll(noteId, runners);
     }
 
+    boolean broken = false;
     try {
       GUI settings = context.getGui();
       RemoteInterpreterResult remoteResult = client.interpret(className, st, 
convert(context));
@@ -215,9 +223,10 @@ public class RemoteInterpreter extends Interpreter {
       InterpreterResult result = convert(remoteResult);
       return result;
     } catch (TException e) {
+      broken = true;
       throw new InterpreterException(e);
     } finally {
-      interpreterProcess.releaseClient(client);
+      interpreterProcess.releaseClient(client, broken);
     }
   }
 
@@ -231,12 +240,14 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e1);
     }
 
+    boolean broken = false;
     try {
       client.cancel(className, convert(context));
     } catch (TException e) {
+      broken = true;
       throw new InterpreterException(e);
     } finally {
-      interpreterProcess.releaseClient(client);
+      interpreterProcess.releaseClient(client, broken);
     }
   }
 
@@ -257,13 +268,15 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e1);
     }
 
+    boolean broken = false;
     try {
       formType = FormType.valueOf(client.getFormType(className));
       return formType;
     } catch (TException e) {
+      broken = true;
       throw new InterpreterException(e);
     } finally {
-      interpreterProcess.releaseClient(client);
+      interpreterProcess.releaseClient(client, broken);
     }
   }
 
@@ -277,12 +290,14 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e1);
     }
 
+    boolean broken = false;
     try {
       return client.getProgress(className, convert(context));
     } catch (TException e) {
+      broken = true;
       throw new InterpreterException(e);
     } finally {
-      interpreterProcess.releaseClient(client);
+      interpreterProcess.releaseClient(client, broken);
     }
   }
 
@@ -297,12 +312,14 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e1);
     }
 
+    boolean broken = false;
     try {
       return client.completion(className, buf, cursor);
     } catch (TException e) {
+      broken = true;
       throw new InterpreterException(e);
     } finally {
-      interpreterProcess.releaseClient(client);
+      interpreterProcess.releaseClient(client, broken);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index 1b734b7..d08d43e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -66,16 +66,18 @@ public class RemoteInterpreterEventPoller extends Thread {
       }
 
       RemoteInterpreterEvent event = null;
+      boolean broken = false;
       try {
         event = client.getEvent();
       } catch (TException e) {
+        broken = true;
         logger.error("Can't get RemoteInterpreterEvent", e);
         waitQuietly();
         continue;
+      } finally {
+        interpreterProcess.releaseClient(client, broken);
       }
 
-      interpreterProcess.releaseClient(client);
-
       Gson gson = new Gson();
 
       AngularObjectRegistry angularObjectRegistry = 
interpreterGroup.getAngularObjectRegistry();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index 0c9e877..8d96f4c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -140,7 +140,27 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
   }
 
   public void releaseClient(Client client) {
-    clientPool.returnObject(client);
+    releaseClient(client, false);
+  }
+
+  public void releaseClient(Client client, boolean broken) {
+    if (broken) {
+      releaseBrokenClient(client);
+    } else {
+      try {
+        clientPool.returnObject(client);
+      } catch (Exception e) {
+        logger.warn("exception occurred during releasing thrift client", e);
+      }
+    }
+  }
+
+  public void releaseBrokenClient(Client client) {
+    try {
+      clientPool.invalidateObject(client);
+    } catch (Exception e) {
+      logger.warn("exception occurred during releasing thrift client", e);
+    }
   }
 
   public int dereference() {
@@ -159,7 +179,8 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
           // safely ignore exception while client.shutdown() may terminates 
remote process
         } finally {
           if (client != null) {
-            releaseClient(client);
+            // no longer used
+            releaseBrokenClient(client);
           }
         }
 
@@ -250,13 +271,15 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
       logger.error("Can't update angular object", e);
     }
 
+    boolean broken = false;
     try {
       Gson gson = new Gson();
       client.angularObjectUpdate(name, noteId, gson.toJson(o));
     } catch (TException e) {
+      broken = true;
       logger.error("Can't update angular object", e);
     } finally {
-      releaseClient(client);
+      releaseClient(client, broken);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
index 51dab12..9be5c22 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java
@@ -251,6 +251,7 @@ public class RemoteScheduler implements Scheduler {
         return Status.ERROR;
       }
 
+      boolean broken = false;
       try {
         String statusStr = client.getStatus(job.getId());
         if ("Unknown".equals(statusStr)) {
@@ -265,6 +266,7 @@ public class RemoteScheduler implements Scheduler {
         listener.afterStatusChange(job, null, status);
         return status;
       } catch (TException e) {
+        broken = true;
         logger.error("Can't get status information", e);
         lastStatus = Status.ERROR;
         return Status.ERROR;
@@ -273,7 +275,7 @@ public class RemoteScheduler implements Scheduler {
         lastStatus = Status.ERROR;
         return Status.ERROR;
       } finally {
-        interpreterProcess.releaseClient(client);
+        interpreterProcess.releaseClient(client, broken);
       }
     }
   }

Reply via email to