shutting down thread pools from event receivers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/570d74ea Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/570d74ea Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/570d74ea Branch: refs/heads/stratos-4.1.x Commit: 570d74ea5543908c2362cc9b3c6224276921de48 Parents: 4c6442a Author: Isuru Haththotuwa <[email protected]> Authored: Fri Dec 18 07:23:07 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Fri Dec 18 07:23:07 2015 +0530 ---------------------------------------------------------------------- .../common/threading/StratosThreadPool.java | 30 +++++++++++++++++--- .../message/receiver/StratosEventReceiver.java | 1 + .../application/ApplicationsEventReceiver.java | 4 ++- .../signup/ApplicationSignUpEventReceiver.java | 4 ++- .../status/ClusterStatusEventReceiver.java | 4 ++- .../mapping/DomainMappingEventReceiver.java | 4 ++- .../health/stat/HealthStatEventReceiver.java | 4 ++- .../initializer/InitializerEventReceiver.java | 4 ++- .../notifier/InstanceNotifierEventReceiver.java | 7 ++--- .../status/InstanceStatusEventReceiver.java | 5 ++-- .../receiver/tenant/TenantEventReceiver.java | 4 ++- .../topology/TopologyEventReceiver.java | 4 ++- 12 files changed, 57 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index 687cec2..b28fb40 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -24,10 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; /** * Utility class for Stratos thread pool @@ -84,4 +81,29 @@ public class StratosThreadPool { } return scheduledExecutorService; } + + public static void shutdown (String identifier) { + + ExecutorService executorService = executorServiceMap.get(identifier); + if (executorService == null) { + log.warn("No executor service found for id " + identifier + ", unable to shut down"); + return; + } + + // try to shut down gracefully + executorService.shutdown(); + // wait 10 secs till terminated + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Thread Pool [id] " + identifier + " did not finish all tasks before " + + "timeout, forcefully shutting down"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + // interrupted, shutdown now + executorService.shutdownNow(); + } + + log.info("Successfully shutdown thread pool associated with id: " + identifier); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java index 5ac89e6..5040371 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService; public class StratosEventReceiver { protected ExecutorService executorService; + protected String threadPoolId; public StratosEventReceiver () { } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java index 69dba01..697d9dd 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java @@ -39,7 +39,8 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ private ApplicationsEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100); + this.threadPoolId = "application-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue(); this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue); this.messageListener = new ApplicationsEventMessageListener(messageQueue); @@ -94,6 +95,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } public void initializeCompleteApplicationsModel() { http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index df90cf9..89cf7ea 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -44,7 +44,8 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { private ApplicationSignUpEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100); + this.threadPoolId = "application-signup-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue(); this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue); this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue); @@ -116,5 +117,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index be42b43..bdbd509 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -40,7 +40,8 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { private ClusterStatusEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100); + this.threadPoolId = "clusterstatus-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue(); this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue); this.messageListener = new ClusterStatusEventMessageListener(messageQueue); @@ -90,6 +91,7 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } public boolean isSubscribed() { http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 6de99c0..7ce3982 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -41,7 +41,8 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { private DomainMappingEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100); + this.threadPoolId = "domainmapping-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue(); this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue); this.messageListener = new DomainMappingEventMessageListener(messageQueue); @@ -67,6 +68,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } private void execute() { http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index a9d2602..1699592 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -40,7 +40,8 @@ public class HealthStatEventReceiver extends StratosEventReceiver { private HealthStatEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100); + this.threadPoolId = "healthstat-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 100); HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue(); this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue); this.messageListener = new HealthStatEventMessageListener(messageQueue); @@ -86,5 +87,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java index 805a8bf..6c948a0 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java @@ -39,7 +39,8 @@ public class InitializerEventReceiver extends StratosEventReceiver { private InitializerEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100); + this.threadPoolId = "initializer-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue(); this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue); this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue); @@ -84,5 +85,6 @@ public class InitializerEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index e0b8e9f..e8f84c2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -36,11 +36,11 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { private EventSubscriber eventSubscriber; private InstanceNotifierEventMessageListener messageListener; private static volatile InstanceNotifierEventReceiver instance; - //private boolean terminated; private InstanceNotifierEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); + this.threadPoolId = "instance-notifier-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); this.messageListener = new InstanceNotifierEventMessageListener(messageQueue); @@ -94,7 +94,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { public synchronized void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - //terminated = true; - log.info("InstanceNotifierEventReceiver terminated"); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java index a565ea9..233de18 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java @@ -39,7 +39,8 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { private InstanceStatusEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); + this.threadPoolId = "instance-status_event-reciever"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue(); this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue); this.messageListener = new InstanceStatusEventMessageListener(messageQueue); @@ -91,6 +92,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - // terminated = true; + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java index a52cb20..5726d44 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -43,7 +43,8 @@ public class TenantEventReceiver extends StratosEventReceiver { private TenantEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100); + this.threadPoolId = "tenant-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); TenantEventMessageQueue messageQueue = new TenantEventMessageQueue(); this.messageDelegator = new TenantEventMessageDelegator(messageQueue); this.messageListener = new TenantEventMessageListener(messageQueue); @@ -112,5 +113,6 @@ public class TenantEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/570d74ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index bfa3950..776b2b3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -45,7 +45,8 @@ public class TopologyEventReceiver extends StratosEventReceiver { private TopologyEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); + this.threadPoolId = "topology-event-receiver"; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, 20); TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue(); this.messageDelegator = new TopologyEventMessageDelegator(messageQueue); this.messageListener = new TopologyEventMessageListener(messageQueue); @@ -95,6 +96,7 @@ public class TopologyEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); + StratosThreadPool.shutdown(threadPoolId); } public void initializeCompleteTopology() {
