shutting down thread pools from event receivers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c3cc2bb4 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c3cc2bb4 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c3cc2bb4 Branch: refs/heads/stratos-4.1.x Commit: c3cc2bb47410655269d7af5ade68743e1cc2f9bd Parents: ede4990 Author: Isuru Haththotuwa <[email protected]> Authored: Fri Dec 18 07:23:07 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Fri Dec 25 16:56:17 2015 +0530 ---------------------------------------------------------------------- .../common/threading/StratosThreadPool.java | 25 ++++++++++++++++++++ .../message/receiver/StratosEventReceiver.java | 1 + .../application/ApplicationsEventReceiver.java | 1 + .../signup/ApplicationSignUpEventReceiver.java | 1 + .../status/ClusterStatusEventReceiver.java | 5 +--- .../mapping/DomainMappingEventReceiver.java | 1 + .../health/stat/HealthStatEventReceiver.java | 1 + .../initializer/InitializerEventReceiver.java | 2 +- .../notifier/InstanceNotifierEventReceiver.java | 4 +--- .../status/InstanceStatusEventReceiver.java | 1 + .../receiver/tenant/TenantEventReceiver.java | 1 + .../topology/TopologyEventReceiver.java | 1 + 12 files changed, 36 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index 1f4e5c8..b72ac84 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -233,4 +233,29 @@ public class StratosThreadPool { " and removed from the cache"); } } + + public static void shutdown (String identifier) { + + ExecutorService executorService = executorMap.get(identifier); + if (executorService == null) { + log.warn("No executor service found for id " + identifier + ", unable to shut down"); + return; + } + + // try to shut down gracefully + executorService.shutdown(); + // wait 10 secs till terminated + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " + + "timeout, forcefully shutting down"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + // interrupted, shutdown now + executorService.shutdownNow(); + } + + log.info("Successfully shutdown thread pool associated with id: " + identifier); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java index 0b07940..e86a05f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; public class StratosEventReceiver { protected ThreadPoolExecutor executor; + protected String threadPoolId; public StratosEventReceiver () { } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java index a68e0e5..df7a006 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java @@ -94,6 +94,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } public void initializeCompleteApplicationsModel() { http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index a28a98f..3fd43c1 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -116,5 +116,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index ecaa758..e014a98 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -89,14 +89,11 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } public boolean isSubscribed() { return ((eventSubscriber != null) && (eventSubscriber.isSubscribed())); } - public void terminate() { - eventSubscriber.terminate(); - messageDelegator.terminate(); - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 844f19b..4d8bca6 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -67,6 +67,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } private void execute() { http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index cc94c6d..689f9ca 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -86,5 +86,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java index b3a29b1..0711293 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java @@ -33,7 +33,6 @@ public class InitializerEventReceiver extends StratosEventReceiver { private InitializerEventMessageListener messageListener; private EventSubscriber eventSubscriber; private static volatile InitializerEventReceiver instance; - //private ExecutorService executor; private InitializerEventReceiver() { // TODO: make pool size configurable @@ -82,5 +81,6 @@ public class InitializerEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index 8eb2bb0..4d02d18 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -36,7 +36,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { private EventSubscriber eventSubscriber; private InstanceNotifierEventMessageListener messageListener; private static volatile InstanceNotifierEventReceiver instance; - //private boolean terminated; private InstanceNotifierEventReceiver() { // TODO: make pool size configurable @@ -94,7 +93,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { public synchronized void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - //terminated = true; - log.info("InstanceNotifierEventReceiver terminated"); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java index 5471915..9e98155 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java @@ -91,5 +91,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java index 1a8473e..94feef0 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -112,5 +112,6 @@ public class TenantEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c3cc2bb4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index 64af927..2fea887 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -95,6 +95,7 @@ public class TopologyEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } public void initializeCompleteTopology() {
