adding comments for StratosEventReceiver abstraction, starting the event 
reseivers from messaging activator and adding shutdown for tenant, application 
and signup synchronizers


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7d1e5270
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7d1e5270
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7d1e5270

Branch: refs/heads/stratos-4.1.x
Commit: 7d1e52709e47b69db22893856863ee9a48558417
Parents: 1ed93dd
Author: Isuru Haththotuwa <[email protected]>
Authored: Thu Dec 24 16:55:07 2015 +0530
Committer: Isuru Haththotuwa <[email protected]>
Committed: Fri Dec 25 17:03:44 2015 +0530

----------------------------------------------------------------------
 .../CloudControllerServiceComponent.java        |  3 +
 .../common/threading/StratosThreadPool.java     |  3 +
 .../StratosManagerServiceComponent.java         |  3 +
 .../internal/MessagingServiceComponent.java     | 21 ++++++-
 .../message/receiver/StratosEventReceiver.java  | 62 +++++++++++++++++++-
 .../application/ApplicationsEventReceiver.java  |  4 --
 .../ApplicationSignUpEventMessageDelegator.java |  4 ++
 .../signup/ApplicationSignUpEventReceiver.java  |  8 +--
 .../ClusterStatusEventMessageDelegator.java     |  4 ++
 .../status/ClusterStatusEventReceiver.java      |  8 +--
 .../DomainMappingEventMessageDelegator.java     |  4 ++
 .../mapping/DomainMappingEventReceiver.java     |  8 +--
 .../stat/HealthStatEventMessageDelegator.java   |  4 ++
 .../health/stat/HealthStatEventReceiver.java    |  7 +--
 .../InitializerEventMessageDelegator.java       |  4 ++
 .../initializer/InitializerEventReceiver.java   |  8 +--
 .../InstanceNotifierEventMessageDelegator.java  |  4 ++
 .../notifier/InstanceNotifierEventReceiver.java |  8 +--
 .../InstanceStatusEventMessageDelegator.java    |  4 ++
 .../status/InstanceStatusEventReceiver.java     |  7 +--
 .../tenant/TenantEventMessageDelegator.java     |  4 ++
 .../receiver/tenant/TenantEventReceiver.java    |  8 +--
 .../topology/TopologyEventMessageDelegator.java |  4 ++
 .../topology/TopologyEventReceiver.java         |  8 +--
 24 files changed, 159 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index 62f3f29..74b9804 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -228,5 +228,8 @@ public class CloudControllerServiceComponent {
         } catch (Exception e) {
             log.warn("An error occurred while closing cloud controller 
topology event publisher", e);
         }
+
+        // shutdown TopologyEventSync task
+        StratosThreadPool.shutdown(THREAD_POOL_ID);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index b72ac84..da48caf 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -256,6 +256,9 @@ public class StratosThreadPool {
             executorService.shutdownNow();
         }
 
+        // remove from the map
+        executorServiceMap.remove(identifier);
+
         log.info("Successfully shutdown thread pool associated with id: " + 
identifier);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index 0dbc417..04ec264 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -327,5 +327,8 @@ public class StratosManagerServiceComponent {
         // Close event publisher connections to message broker
         
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
         
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
+
+        // shut down the scheduled thread pool
+        StratosThreadPool.shutdown(THREAD_POOL_ID);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
index c97125b..b582d56 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.internal;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import 
org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
 import 
org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
 import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
@@ -40,8 +42,20 @@ public class MessagingServiceComponent {
     private static final Log log = 
LogFactory.getLog(MessagingServiceComponent.class);
 
     protected void activate(ComponentContext context) {
+        // activate all message receivers
         try {
-            log.info("Messaging Service bundle activated");
+            ApplicationSignUpEventReceiver.getInstance();
+            ApplicationsEventReceiver.getInstance();
+            ClusterStatusEventReceiver.getInstance();
+            DomainMappingEventReceiver.getInstance();
+            HealthStatEventReceiver.getInstance();
+            InitializerEventReceiver.getInstance();
+            TenantEventReceiver.getInstance();
+            TopologyEventReceiver.getInstance();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Messaging Service bundle activated");
+            }
         } catch (Exception e) {
             log.error("Could not activate Messaging Service component", e);
         }
@@ -58,7 +72,10 @@ public class MessagingServiceComponent {
             InitializerEventReceiver.getInstance().terminate();
             TenantEventReceiver.getInstance().terminate();
             TopologyEventReceiver.getInstance().terminate();
-            log.info("Messaging Service component is deactivated");
+            
StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID);
+            if (log.isDebugEnabled()) {
+                log.debug("Messaging Service component is deactivated");
+            }
         } catch (Exception e) {
             log.error("Could not de-activate Messaging Service component", e);
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index e86a05f..67258c3 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -20,12 +20,72 @@
 package org.apache.stratos.messaging.message.receiver;
 
 import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.listener.EventListener;
 
-public class StratosEventReceiver {
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Abstraction for Event Receivers used in Stratos
+ */
+public abstract class StratosEventReceiver {
 
     protected ThreadPoolExecutor executor;
+    private static final Log log = 
LogFactory.getLog(StratosEventReceiver.class);
+
+    /**
+     * Thread pool information for all StratosEventReceiver implementations
+     */
+
+    public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = 
"stratos-event-receiver-pool";
+    private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = 
"stratos.event.receiver.pool.size";
+
+    // thread pool id
     protected String threadPoolId;
+    // executor service used
+    protected ExecutorService executorService;
+    // pool size
+    protected static int threadPoolSize = 15;
+
+    static {
+        // check if the thread pool size is given as a system parameter
+        String poolSize = 
System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE);
+        if (poolSize != null) {
+            try {
+                threadPoolSize = Integer.parseInt(poolSize);
+            } catch (NumberFormatException e) {
+                log.error("Invalid configuration found for 
StratosEventReceiver thread pool size", e);
+                threadPoolSize = 15;
+            }
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Number of threads used in pool " + 
STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize);
+        }
+    }
 
     public StratosEventReceiver () {
+        this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID;
+        this.executorService = 
StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize);
     }
+
+    /**
+     * Adds an EventListener to this StratosEventReceiver instance
+     *
+     * @param eventListener EventListener instance to add
+     */
+    public abstract void addEventListener(EventListener eventListener);
+
+    /**
+     * Removed an EventListener from this StratosEventReceiver instance
+     *
+     * @param eventListener EventListener instance to remove
+     */
+    public abstract void removeEventListener(EventListener eventListener);
+
+    /**
+     * Terminates this StratosEventReceiver instance
+     */
+    public abstract void terminate();
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index df7a006..ddb8170 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -20,7 +20,6 @@ package 
org.apache.stratos.messaging.message.receiver.application;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
@@ -38,8 +37,6 @@ public class ApplicationsEventReceiver extends 
StratosEventReceiver{
     private static volatile ApplicationsEventReceiver instance;
 
     private ApplicationsEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         ApplicationsEventMessageQueue messageQueue = new 
ApplicationsEventMessageQueue();
         this.messageDelegator = new 
ApplicationsEventMessageDelegator(messageQueue);
         this.messageListener = new 
ApplicationsEventMessageListener(messageQueue);
@@ -94,7 +91,6 @@ public class ApplicationsEventReceiver extends 
StratosEventReceiver{
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 
     public void initializeCompleteApplicationsModel() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
index adf805d..59374bb 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ApplicationSignUpEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index 3fd43c1..d784b28 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.application.signup;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
@@ -43,8 +42,6 @@ public class ApplicationSignUpEventReceiver extends 
StratosEventReceiver {
     private static volatile ApplicationSignUpEventReceiver instance;
 
     private ApplicationSignUpEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         ApplicationSignUpEventMessageQueue messageQueue = new 
ApplicationSignUpEventMessageQueue();
         this.messageDelegator = new 
ApplicationSignUpEventMessageDelegator(messageQueue);
         this.messageListener = new 
ApplicationSignUpEventMessageListener(messageQueue);
@@ -67,6 +64,10 @@ public class ApplicationSignUpEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -116,6 +117,5 @@ public class ApplicationSignUpEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
index 5c9c502..954d9be 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ClusterStatusEventMessageDelegator implements Runnable 
{
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index e014a98..3c4c14c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.cluster.status;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
@@ -38,8 +37,6 @@ public class ClusterStatusEventReceiver extends 
StratosEventReceiver {
     private static volatile ClusterStatusEventReceiver instance;
 
     private ClusterStatusEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         ClusterStatusEventMessageQueue messageQueue = new 
ClusterStatusEventMessageQueue();
         this.messageDelegator = new 
ClusterStatusEventMessageDelegator(messageQueue);
         this.messageListener = new 
ClusterStatusEventMessageListener(messageQueue);
@@ -50,6 +47,10 @@ public class ClusterStatusEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     public static ClusterStatusEventReceiver getInstance () {
         if (instance == null) {
             synchronized (ClusterStatusEventReceiver.class) {
@@ -89,7 +90,6 @@ public class ClusterStatusEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 
     public boolean isSubscribed() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
index fa783a9..03154f2 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
@@ -46,6 +46,10 @@ class DomainMappingEventMessageDelegator implements Runnable 
{
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 4d8bca6..33bec8f 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.domain.mapping;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
@@ -40,8 +39,6 @@ public class DomainMappingEventReceiver extends 
StratosEventReceiver {
     private static volatile DomainMappingEventReceiver instance;
 
     private DomainMappingEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         DomainMappingEventMessageQueue messageQueue = new 
DomainMappingEventMessageQueue();
         this.messageDelegator = new 
DomainMappingEventMessageDelegator(messageQueue);
         this.messageListener = new 
DomainMappingEventMessageListener(messageQueue);
@@ -52,6 +49,10 @@ public class DomainMappingEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     public static DomainMappingEventReceiver getInstance () {
         if (instance == null) {
             synchronized (DomainMappingEventReceiver.class) {
@@ -67,7 +68,6 @@ public class DomainMappingEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 
     private void execute() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 2cde2a9..29fb47b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -48,6 +48,10 @@ class HealthStatEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index 689f9ca..69a717c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.health.stat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
@@ -39,8 +38,6 @@ public class HealthStatEventReceiver extends 
StratosEventReceiver {
     private static volatile HealthStatEventReceiver instance;
 
     private HealthStatEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         HealthStatEventMessageQueue messageQueue = new 
HealthStatEventMessageQueue();
         this.messageDelegator = new 
HealthStatEventMessageDelegator(messageQueue);
         this.messageListener = new 
HealthStatEventMessageListener(messageQueue);
@@ -63,6 +60,9 @@ public class HealthStatEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
 
     private void execute() {
         try {
@@ -86,6 +86,5 @@ public class HealthStatEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
index ffd2ae4..baca350 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -41,6 +41,10 @@ public class InitializerEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 0711293..a8af4b7 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -20,7 +20,6 @@ package 
org.apache.stratos.messaging.message.receiver.initializer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
@@ -35,8 +34,6 @@ public class InitializerEventReceiver extends 
StratosEventReceiver {
     private static volatile InitializerEventReceiver instance;
 
     private InitializerEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         InitializerEventMessageQueue initializerEventMessageQueue = new 
InitializerEventMessageQueue();
         this.messageDelegator = new 
InitializerEventMessageDelegator(initializerEventMessageQueue);
         this.messageListener = new 
InitializerEventMessageListener(initializerEventMessageQueue);
@@ -59,6 +56,10 @@ public class InitializerEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -81,6 +82,5 @@ public class InitializerEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 73ef9fe..b695db7 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceNotifierEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index 4d02d18..520f64e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.instance.notifier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
@@ -38,8 +37,6 @@ public class InstanceNotifierEventReceiver extends 
StratosEventReceiver {
     private static volatile InstanceNotifierEventReceiver instance;
 
     private InstanceNotifierEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         InstanceNotifierEventMessageQueue messageQueue = new 
InstanceNotifierEventMessageQueue();
         this.messageDelegator = new 
InstanceNotifierEventMessageDelegator(messageQueue);
         this.messageListener = new 
InstanceNotifierEventMessageListener(messageQueue);
@@ -62,6 +59,10 @@ public class InstanceNotifierEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -93,6 +94,5 @@ public class InstanceNotifierEventReceiver extends 
StratosEventReceiver {
     public synchronized void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
index 9f754b0..e5df65e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceStatusEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index 9e98155..f06f629 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.instance.status;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
 import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
@@ -38,8 +37,6 @@ public class InstanceStatusEventReceiver extends 
StratosEventReceiver {
     private static volatile InstanceStatusEventReceiver instance;
 
     private InstanceStatusEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         InstanceStatusEventMessageQueue messageQueue = new 
InstanceStatusEventMessageQueue();
         this.messageDelegator = new 
InstanceStatusEventMessageDelegator(messageQueue);
         this.messageListener = new 
InstanceStatusEventMessageListener(messageQueue);
@@ -62,6 +59,9 @@ public class InstanceStatusEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
 
     private void execute() {
         try {
@@ -91,6 +91,5 @@ public class InstanceStatusEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index c735d9b..cd8724c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -48,6 +48,10 @@ class TenantEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index 94feef0..b03f51a 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -21,7 +21,6 @@ package org.apache.stratos.messaging.message.receiver.tenant;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
@@ -42,8 +41,6 @@ public class TenantEventReceiver extends StratosEventReceiver 
{
     private static volatile TenantEventReceiver instance;
 
     private TenantEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
         this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
         this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -66,6 +63,10 @@ public class TenantEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -112,6 +113,5 @@ public class TenantEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 8508d91..d2664f4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -47,6 +47,10 @@ class TopologyEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/7d1e5270/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index 2fea887..fdd419b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -21,7 +21,6 @@ package 
org.apache.stratos.messaging.message.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.broker.subscribe.EventSubscriber;
@@ -44,8 +43,6 @@ public class TopologyEventReceiver extends 
StratosEventReceiver {
     private static volatile TopologyEventReceiver instance;
 
     private TopologyEventReceiver() {
-        // TODO: make pool size configurable
-        this.executor = 
StratosThreadPool.getExecutorService("messaging-event-receiver", 35, 150);
         TopologyEventMessageQueue messageQueue = new 
TopologyEventMessageQueue();
         this.messageDelegator = new 
TopologyEventMessageDelegator(messageQueue);
         this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -68,6 +65,10 @@ public class TopologyEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -95,7 +96,6 @@ public class TopologyEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        StratosThreadPool.shutdown(threadPoolId);
     }
 
     public void initializeCompleteTopology() {

Reply via email to