Repository: incubator-zeppelin Updated Branches: refs/heads/master 3123d3de8 -> fc870893f
ZEPPELIN-534 Discard broken thrift Client instance ### What is this PR for? Zeppelin has been reused broken thrift client instances. Since we can catch TException, we can discard client instances which throws TException from client pool. ### What type of PR is it? Bug Fix | Improvement ### Todos ### Is there a relevant Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-534 ### How should this be tested? 1. run notebook which uses spark interpreter 2. kill spark interpreter with -9 3. run notebook which uses killed interpreter 4. run same notebook again and see error log has changed output of 3 ``` java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpre ter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:225) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:211) at org.apache.zeppelin.scheduler.Job.run(Job.java:169) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:322) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.r un(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` output of 4 ``` java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.apache.thrift.transport.TSocket.open(TSocket.java:182) at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51) at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37) at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) at org.apache.commons.pool2.impl.Gen ericObjectPool.borrowObject(GenericObjectPool.java:363) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:140) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:205) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:211) at org.apache.zeppelin.scheduler.Job.run(Job.java:169) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:322) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(T hreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) ``` Result could be different how many clients instances pool makes at initial phase. Before applying this, output of 4 would be ```broken pipe```, which means it doesn't discard previous client instance. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? (No) * Is there breaking changes for older versions? (No) * Does this needs documentation? (No) Author: Jungtaek Lim <[email protected]> Closes #575 from HeartSaVioR/ZEPPELIN-534 and squashes the following commits: c956333 [Jungtaek Lim] ZEPPELIN-534 return thrift client to Pool quietly 22724cf [Jungtaek Lim] ZEPPELIN-534 Discard broken thrift Client instance Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/fc870893 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/fc870893 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/fc870893 Branch: refs/heads/master Commit: fc870893f0d2bae2116fb64270d455869268f317 Parents: 3123d3d Author: Jungtaek Lim <[email protected]> Authored: Tue Jan 5 12:55:28 2016 +0900 Committer: Alexander Bezzubov <[email protected]> Committed: Tue Jan 5 18:20:47 2016 +0900 ---------------------------------------------------------------------- .../remote/RemoteAngularObjectRegistry.java | 13 ++++++-- .../interpreter/remote/RemoteInterpreter.java | 31 +++++++++++++++----- .../remote/RemoteInterpreterEventPoller.java | 6 ++-- .../remote/RemoteInterpreterProcess.java | 29 ++++++++++++++++-- .../zeppelin/scheduler/RemoteScheduler.java | 4 ++- 5 files changed, 68 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index b7ac014..a7ddf49 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote; import java.util.List; +import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -77,15 +78,19 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { } Client client = null; + boolean broken = false; try { client = remoteInterpreterProcess.getClient(); client.angularObjectAdd(name, noteId, gson.toJson(o)); return super.add(name, o, noteId, true); + } catch (TException e) { + broken = true; + logger.error("Error", e); } catch (Exception e) { logger.error("Error", e); } finally { if (client != null) { - remoteInterpreterProcess.releaseClient(client); + remoteInterpreterProcess.releaseClient(client, broken); } } return null; @@ -106,15 +111,19 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { } Client client = null; + boolean broken = false; try { client = remoteInterpreterProcess.getClient(); client.angularObjectRemove(name, noteId); return super.remove(name, noteId); + } catch (TException e) { + broken = true; + logger.error("Error", e); } catch (Exception e) { logger.error("Error", e); } finally { if (client != null) { - remoteInterpreterProcess.releaseClient(client); + remoteInterpreterProcess.releaseClient(client, broken); } } return null; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index c72aa7c..455156c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -131,6 +131,7 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e1); } + boolean broken = false; try { for (Interpreter intp : this.getInterpreterGroup()) { logger.info("Create remote interpreter {}", intp.getClassName()); @@ -138,9 +139,10 @@ public class RemoteInterpreter extends Interpreter { } } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } } @@ -158,14 +160,19 @@ public class RemoteInterpreter extends Interpreter { public void close() { RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); Client client = null; + + boolean broken = false; try { client = interpreterProcess.getClient(); client.close(className); + } catch (TException e) { + broken = true; + throw new InterpreterException(e); } catch (Exception e1) { throw new InterpreterException(e1); } finally { if (client != null) { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } getInterpreterProcess().dereference(); } @@ -195,6 +202,7 @@ public class RemoteInterpreter extends Interpreter { interpreterContextRunnerPool.addAll(noteId, runners); } + boolean broken = false; try { GUI settings = context.getGui(); RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context)); @@ -215,9 +223,10 @@ public class RemoteInterpreter extends Interpreter { InterpreterResult result = convert(remoteResult); return result; } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -231,12 +240,14 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e1); } + boolean broken = false; try { client.cancel(className, convert(context)); } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -257,13 +268,15 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e1); } + boolean broken = false; try { formType = FormType.valueOf(client.getFormType(className)); return formType; } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -277,12 +290,14 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e1); } + boolean broken = false; try { return client.getProgress(className, convert(context)); } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } @@ -297,12 +312,14 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e1); } + boolean broken = false; try { return client.completion(className, buf, cursor); } catch (TException e) { + broken = true; throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 1b734b7..d08d43e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -66,16 +66,18 @@ public class RemoteInterpreterEventPoller extends Thread { } RemoteInterpreterEvent event = null; + boolean broken = false; try { event = client.getEvent(); } catch (TException e) { + broken = true; logger.error("Can't get RemoteInterpreterEvent", e); waitQuietly(); continue; + } finally { + interpreterProcess.releaseClient(client, broken); } - interpreterProcess.releaseClient(client); - Gson gson = new Gson(); AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 0c9e877..8d96f4c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -140,7 +140,27 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { } public void releaseClient(Client client) { - clientPool.returnObject(client); + releaseClient(client, false); + } + + public void releaseClient(Client client, boolean broken) { + if (broken) { + releaseBrokenClient(client); + } else { + try { + clientPool.returnObject(client); + } catch (Exception e) { + logger.warn("exception occurred during releasing thrift client", e); + } + } + } + + public void releaseBrokenClient(Client client) { + try { + clientPool.invalidateObject(client); + } catch (Exception e) { + logger.warn("exception occurred during releasing thrift client", e); + } } public int dereference() { @@ -159,7 +179,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { // safely ignore exception while client.shutdown() may terminates remote process } finally { if (client != null) { - releaseClient(client); + // no longer used + releaseBrokenClient(client); } } @@ -250,13 +271,15 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { logger.error("Can't update angular object", e); } + boolean broken = false; try { Gson gson = new Gson(); client.angularObjectUpdate(name, noteId, gson.toJson(o)); } catch (TException e) { + broken = true; logger.error("Can't update angular object", e); } finally { - releaseClient(client); + releaseClient(client, broken); } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/fc870893/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 51dab12..9be5c22 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -251,6 +251,7 @@ public class RemoteScheduler implements Scheduler { return Status.ERROR; } + boolean broken = false; try { String statusStr = client.getStatus(job.getId()); if ("Unknown".equals(statusStr)) { @@ -265,6 +266,7 @@ public class RemoteScheduler implements Scheduler { listener.afterStatusChange(job, null, status); return status; } catch (TException e) { + broken = true; logger.error("Can't get status information", e); lastStatus = Status.ERROR; return Status.ERROR; @@ -273,7 +275,7 @@ public class RemoteScheduler implements Scheduler { lastStatus = Status.ERROR; return Status.ERROR; } finally { - interpreterProcess.releaseClient(client); + interpreterProcess.releaseClient(client, broken); } } }
