This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 69faeaf On shutdown, kill the function instance thread after interrupt attempt (#3749) 69faeaf is described below commit 69faeaf80a70947fca18e36d47cdf7540560d637 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Mar 6 14:39:26 2019 -0800 On shutdown, kill the function instance thread after interrupt attempt (#3749) --- .../org/apache/pulsar/functions/runtime/ThreadRuntime.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java index ad1002c..93246f2 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java @@ -44,6 +44,8 @@ class ThreadRuntime implements Runtime { // The thread that invokes the function private Thread fnThread; + private static final int THREAD_SHUTDOWN_TIMEOUT_MILLIS = 10_000; + @Getter private InstanceConfig instanceConfig; private JavaInstanceRunnable javaInstanceRunnable; @@ -114,13 +116,19 @@ class ThreadRuntime implements Runtime { } } + @SuppressWarnings("deprecation") @Override public void stop() { if (fnThread != null) { // interrupt the instance thread fnThread.interrupt(); try { - fnThread.join(); + // If the instance thread doesn't respond within some time, attempt to + // kill the thread + fnThread.join(THREAD_SHUTDOWN_TIMEOUT_MILLIS, 0); + if (fnThread.isAlive()) { + fnThread.stop(); + } } catch (InterruptedException e) { // ignore this } @@ -152,8 +160,8 @@ class ThreadRuntime implements Runtime { public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() { return CompletableFuture.completedFuture(javaInstanceRunnable.getAndResetMetrics()); } - - + + @Override public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) { return CompletableFuture.completedFuture(javaInstanceRunnable.getMetrics());