This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 64bf4bb [ZEPPELIN-4390]. ExecutorService is not properly shutdown 64bf4bb is described below commit 64bf4bb3d06583050fa2103251df0a3379c50711 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Oct 23 18:10:10 2019 +0800 [ZEPPELIN-4390]. ExecutorService is not properly shutdown ### What is this PR for? `ExecutorService` is not properly shutdown due to we didn't use the correct api. We should use `shutdownNow` instead of `shutdown`. See https://stackoverflow.com/questions/11520189/difference-between-shutdown-and-shutdownnow-of-executor-service. The effect of this issue will cause thread resource leakage. ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4390 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3491 from zjffdu/ZEPPELIN-4390 and squashes the following commits: 439221492 [Jeff Zhang] [ZEPPELIN-4390]. ExecutorService is not properly shutdown --- .../org/apache/zeppelin/python/IPythonInterpreterTest.java | 3 ++- .../org/apache/zeppelin/interpreter/InterpreterGroup.java | 1 + .../zeppelin/interpreter/remote/RemoteInterpreterServer.java | 5 ++++- .../org/apache/zeppelin/scheduler/AbstractScheduler.java | 2 +- .../java/org/apache/zeppelin/scheduler/FIFOScheduler.java | 12 +++++++++--- .../java/org/apache/zeppelin/scheduler/SchedulerFactory.java | 6 +++++- .../zeppelin/interpreter/RemoteInterpreterEventServer.java | 1 + 7 files changed, 23 insertions(+), 7 deletions(-) diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java index 87e5071..4268ebb 100644 --- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java @@ -130,7 +130,8 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest { result = interpreter.interpret(codeKillKernel, context); assertEquals(Code.ERROR, result.code()); output = context.out.toInterpreterResultMessage().get(0); - assertTrue(output.getData().equals("Ipython kernel has been stopped. Please check logs. " + assertTrue(output.getData(), + output.getData().equals("Ipython kernel has been stopped. Please check logs. " + "It might be because of an out of memory issue.")); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 4cf4b31..aa73a47 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -167,6 +167,7 @@ public class InterpreterGroup { for (Interpreter interpreter : session) { try { interpreter.close(); + interpreter.getScheduler().stop(); } catch (InterpreterException e) { LOGGER.warn("Fail to close interpreter: " + interpreter.getClassName(), e); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index a143cd3..b2fc061 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -74,6 +74,7 @@ import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -255,7 +256,9 @@ public class RemoteInterpreterServer extends Thread } } } - + if (!isTest) { + SchedulerFactory.singleton().destroy(); + } server.stop(); // server.stop() does not always finish server.serve() loop diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java index c264b9b..85680ed 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/AbstractScheduler.java @@ -76,7 +76,7 @@ public abstract class AbstractScheduler implements Scheduler { @Override public void run() { - while (!terminate) { + while (!terminate && !Thread.currentThread().isInterrupted()) { Job runningJob = null; try { runningJob = queue.take(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index 30e0763..b9d5e82 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -17,7 +17,7 @@ package org.apache.zeppelin.scheduler; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -25,12 +25,12 @@ import java.util.concurrent.Executors; */ public class FIFOScheduler extends AbstractScheduler { - private Executor executor; + private ExecutorService executor; FIFOScheduler(String name) { super(name); executor = Executors.newSingleThreadExecutor( - new SchedulerThreadFactory("FIFOScheduler-Worker-")); + new SchedulerThreadFactory("FIFOScheduler-" + name + "-Worker-")); } @Override @@ -38,4 +38,10 @@ public class FIFOScheduler extends AbstractScheduler { // run job in the SingleThreadExecutor since this is FIFO. executor.execute(() -> runJob(job)); } + + @Override + public void stop() { + super.stop(); + executor.shutdownNow(); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 8b525e8..28cdb3e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -64,7 +64,11 @@ public class SchedulerFactory { } public void destroy() { - ExecutorFactory.singleton().shutdown("SchedulerFactory"); + LOGGER.info("Destroy all executors"); + ExecutorFactory.singleton().shutdown(SCHEDULER_EXECUTOR_NAME); + this.executor.shutdownNow(); + this.executor = null; + singleton = null; } public Scheduler createOrGetFIFOScheduler(String name) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index 7f00934..41164ef 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -140,6 +140,7 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi if (appendFuture != null) { appendFuture.cancel(true); } + appendService.shutdownNow(); LOGGER.info("RemoteInterpreterEventServer is stopped"); }