adding comments for StratosEventReceiver abstraction, starting the event reseivers from messaging activator and adding shutdown for tenant, application and signup synchronizers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7d1e5270 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7d1e5270 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7d1e5270 Branch: refs/heads/stratos-4.1.x Commit: 7d1e52709e47b69db22893856863ee9a48558417 Parents: 1ed93dd Author: Isuru Haththotuwa <[email protected]> Authored: Thu Dec 24 16:55:07 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Fri Dec 25 17:03:44 2015 +0530 ---------------------------------------------------------------------- .../CloudControllerServiceComponent.java | 3 + .../common/threading/StratosThreadPool.java | 3 + .../StratosManagerServiceComponent.java | 3 + .../internal/MessagingServiceComponent.java | 21 ++++++- .../message/receiver/StratosEventReceiver.java | 62 +++++++++++++++++++- .../application/ApplicationsEventReceiver.java | 4 -- .../ApplicationSignUpEventMessageDelegator.java | 4 ++ .../signup/ApplicationSignUpEventReceiver.java | 8 +-- .../ClusterStatusEventMessageDelegator.java | 4 ++ .../status/ClusterStatusEventReceiver.java | 8 +-- .../DomainMappingEventMessageDelegator.java | 4 ++ .../mapping/DomainMappingEventReceiver.java | 8 +-- .../stat/HealthStatEventMessageDelegator.java | 4 ++ .../health/stat/HealthStatEventReceiver.java | 7 +-- .../InitializerEventMessageDelegator.java | 4 ++ .../initializer/InitializerEventReceiver.java | 8 +-- .../InstanceNotifierEventMessageDelegator.java | 4 ++ .../notifier/InstanceNotifierEventReceiver.java | 8 +-- .../InstanceStatusEventMessageDelegator.java | 4 ++ .../status/InstanceStatusEventReceiver.java | 7 +-- .../tenant/TenantEventMessageDelegator.java | 4 ++ .../receiver/tenant/TenantEventReceiver.java | 8 +-- .../topology/TopologyEventMessageDelegator.java | 4 ++ .../topology/TopologyEventReceiver.java | 8 +-- 24 files changed, 159 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index 62f3f29..74b9804 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -228,5 +228,8 @@ public class CloudControllerServiceComponent { } catch (Exception e) { log.warn("An error occurred while closing cloud controller topology event publisher", e); } + + // shutdown TopologyEventSync task + StratosThreadPool.shutdown(THREAD_POOL_ID); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index b72ac84..da48caf 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -256,6 +256,9 @@ public class StratosThreadPool { executorService.shutdownNow(); } + // remove from the map + executorServiceMap.remove(identifier); + log.info("Successfully shutdown thread pool associated with id: " + identifier); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index 0dbc417..04ec264 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -327,5 +327,8 @@ public class StratosManagerServiceComponent { // Close event publisher connections to message broker EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName()); EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName()); + + // shut down the scheduled thread pool + StratosThreadPool.shutdown(THREAD_POOL_ID); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java index c97125b..b582d56 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java @@ -21,6 +21,8 @@ package org.apache.stratos.messaging.internal; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver; import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver; import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; @@ -40,8 +42,20 @@ public class MessagingServiceComponent { private static final Log log = LogFactory.getLog(MessagingServiceComponent.class); protected void activate(ComponentContext context) { + // activate all message receivers try { - log.info("Messaging Service bundle activated"); + ApplicationSignUpEventReceiver.getInstance(); + ApplicationsEventReceiver.getInstance(); + ClusterStatusEventReceiver.getInstance(); + DomainMappingEventReceiver.getInstance(); + HealthStatEventReceiver.getInstance(); + InitializerEventReceiver.getInstance(); + TenantEventReceiver.getInstance(); + TopologyEventReceiver.getInstance(); + + if (log.isDebugEnabled()) { + log.debug("Messaging Service bundle activated"); + } } catch (Exception e) { log.error("Could not activate Messaging Service component", e); } @@ -58,7 +72,10 @@ public class MessagingServiceComponent { InitializerEventReceiver.getInstance().terminate(); TenantEventReceiver.getInstance().terminate(); TopologyEventReceiver.getInstance().terminate(); - log.info("Messaging Service component is deactivated"); + StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID); + if (log.isDebugEnabled()) { + log.debug("Messaging Service component is deactivated"); + } } catch (Exception e) { log.error("Could not de-activate Messaging Service component", e); } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java index e86a05f..67258c3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java @@ -20,12 +20,72 @@ package org.apache.stratos.messaging.message.receiver; import java.util.concurrent.ThreadPoolExecutor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.messaging.listener.EventListener; -public class StratosEventReceiver { +import java.util.concurrent.ExecutorService; + +/** + * Abstraction for Event Receivers used in Stratos + */ +public abstract class StratosEventReceiver { protected ThreadPoolExecutor executor; + private static final Log log = LogFactory.getLog(StratosEventReceiver.class); + + /** + * Thread pool information for all StratosEventReceiver implementations + */ + + public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = "stratos-event-receiver-pool"; + private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = "stratos.event.receiver.pool.size"; + + // thread pool id protected String threadPoolId; + // executor service used + protected ExecutorService executorService; + // pool size + protected static int threadPoolSize = 15; + + static { + // check if the thread pool size is given as a system parameter + String poolSize = System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE); + if (poolSize != null) { + try { + threadPoolSize = Integer.parseInt(poolSize); + } catch (NumberFormatException e) { + log.error("Invalid configuration found for StratosEventReceiver thread pool size", e); + threadPoolSize = 15; + } + } + if (log.isDebugEnabled()) { + log.debug("Number of threads used in pool " + STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize); + } + } public StratosEventReceiver () { + this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID; + this.executorService = StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize); } + + /** + * Adds an EventListener to this StratosEventReceiver instance + * + * @param eventListener EventListener instance to add + */ + public abstract void addEventListener(EventListener eventListener); + + /** + * Removed an EventListener from this StratosEventReceiver instance + * + * @param eventListener EventListener instance to remove + */ + public abstract void removeEventListener(EventListener eventListener); + + /** + * Terminates this StratosEventReceiver instance + */ + public abstract void terminate(); } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java index df7a006..ddb8170 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java @@ -20,7 +20,6 @@ package org.apache.stratos.messaging.message.receiver.application; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; @@ -38,8 +37,6 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ private static volatile ApplicationsEventReceiver instance; private ApplicationsEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue(); this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue); this.messageListener = new ApplicationsEventMessageListener(messageQueue); @@ -94,7 +91,6 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } public void initializeCompleteApplicationsModel() { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java index adf805d..59374bb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java @@ -46,6 +46,10 @@ class ApplicationSignUpEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index 3fd43c1..d784b28 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.application.signup; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; @@ -43,8 +42,6 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { private static volatile ApplicationSignUpEventReceiver instance; private ApplicationSignUpEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue(); this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue); this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue); @@ -67,6 +64,10 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread @@ -116,6 +117,5 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java index 5c9c502..954d9be 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java @@ -46,6 +46,10 @@ class ClusterStatusEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index e014a98..3c4c14c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.cluster.status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; @@ -38,8 +37,6 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { private static volatile ClusterStatusEventReceiver instance; private ClusterStatusEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue(); this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue); this.messageListener = new ClusterStatusEventMessageListener(messageQueue); @@ -50,6 +47,10 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + public static ClusterStatusEventReceiver getInstance () { if (instance == null) { synchronized (ClusterStatusEventReceiver.class) { @@ -89,7 +90,6 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } public boolean isSubscribed() { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java index fa783a9..03154f2 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java @@ -46,6 +46,10 @@ class DomainMappingEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 4d8bca6..33bec8f 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.domain.mapping; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; @@ -40,8 +39,6 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { private static volatile DomainMappingEventReceiver instance; private DomainMappingEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue(); this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue); this.messageListener = new DomainMappingEventMessageListener(messageQueue); @@ -52,6 +49,10 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + public static DomainMappingEventReceiver getInstance () { if (instance == null) { synchronized (DomainMappingEventReceiver.class) { @@ -67,7 +68,6 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } private void execute() { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java index 2cde2a9..29fb47b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java @@ -48,6 +48,10 @@ class HealthStatEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index 689f9ca..69a717c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.health.stat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; @@ -39,8 +38,6 @@ public class HealthStatEventReceiver extends StratosEventReceiver { private static volatile HealthStatEventReceiver instance; private HealthStatEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue(); this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue); this.messageListener = new HealthStatEventMessageListener(messageQueue); @@ -63,6 +60,9 @@ public class HealthStatEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } private void execute() { try { @@ -86,6 +86,5 @@ public class HealthStatEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java index ffd2ae4..baca350 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java @@ -41,6 +41,10 @@ public class InitializerEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java index 0711293..a8af4b7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java @@ -20,7 +20,6 @@ package org.apache.stratos.messaging.message.receiver.initializer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; @@ -35,8 +34,6 @@ public class InitializerEventReceiver extends StratosEventReceiver { private static volatile InitializerEventReceiver instance; private InitializerEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue(); this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue); this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue); @@ -59,6 +56,10 @@ public class InitializerEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread @@ -81,6 +82,5 @@ public class InitializerEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java index 73ef9fe..b695db7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java @@ -46,6 +46,10 @@ class InstanceNotifierEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index 4d02d18..520f64e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; @@ -38,8 +37,6 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { private static volatile InstanceNotifierEventReceiver instance; private InstanceNotifierEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); this.messageListener = new InstanceNotifierEventMessageListener(messageQueue); @@ -62,6 +59,10 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread @@ -93,6 +94,5 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { public synchronized void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java index 9f754b0..e5df65e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java @@ -46,6 +46,10 @@ class InstanceStatusEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java index 9e98155..f06f629 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.instance.status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; @@ -38,8 +37,6 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { private static volatile InstanceStatusEventReceiver instance; private InstanceStatusEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue(); this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue); this.messageListener = new InstanceStatusEventMessageListener(messageQueue); @@ -62,6 +59,9 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } private void execute() { try { @@ -91,6 +91,5 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java index c735d9b..cd8724c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java @@ -48,6 +48,10 @@ class TenantEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java index 94feef0..b03f51a 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.tenant; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; @@ -42,8 +41,6 @@ public class TenantEventReceiver extends StratosEventReceiver { private static volatile TenantEventReceiver instance; private TenantEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); TenantEventMessageQueue messageQueue = new TenantEventMessageQueue(); this.messageDelegator = new TenantEventMessageDelegator(messageQueue); this.messageListener = new TenantEventMessageListener(messageQueue); @@ -66,6 +63,10 @@ public class TenantEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread @@ -112,6 +113,5 @@ public class TenantEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java index 8508d91..d2664f4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java @@ -47,6 +47,10 @@ class TopologyEventMessageDelegator implements Runnable { processorChain.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + processorChain.removeEventListener(eventListener); + } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index 2fea887..fdd419b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; @@ -44,8 +43,6 @@ public class TopologyEventReceiver extends StratosEventReceiver { private static volatile TopologyEventReceiver instance; private TopologyEventReceiver() { - // TODO: make pool size configurable - this.executor = StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150); TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue(); this.messageDelegator = new TopologyEventMessageDelegator(messageQueue); this.messageListener = new TopologyEventMessageListener(messageQueue); @@ -68,6 +65,10 @@ public class TopologyEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } + public void removeEventListener(EventListener eventListener) { + messageDelegator.removeEventListener(eventListener); + } + private void execute() { try { // Start topic subscriber thread @@ -95,7 +96,6 @@ public class TopologyEventReceiver extends StratosEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - StratosThreadPool.shutdown(threadPoolId); } public void initializeCompleteTopology() {
