adding comments for StratosEventReceiver abstraction, starting the event 
reseivers from messaging activator and adding shutdown for tenant, application 
and signup synchronizers

Conflicts:
        
components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
        
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c90eb9a7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c90eb9a7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c90eb9a7

Branch: refs/heads/master
Commit: c90eb9a728ca6b9f2ef33dbd6ceece635ddb77ab
Parents: 905e140
Author: Isuru Haththotuwa <[email protected]>
Authored: Thu Dec 24 16:55:07 2015 +0530
Committer: Isuru Haththotuwa <[email protected]>
Committed: Thu Dec 24 20:12:43 2015 +0530

----------------------------------------------------------------------
 .../CloudControllerServiceComponent.java        |  3 +
 .../common/threading/StratosThreadPool.java     | 35 +++++++++--
 .../StratosManagerServiceComponent.java         |  3 +
 .../internal/MessagingServiceComponent.java     | 21 ++++++-
 .../message/receiver/StratosEventReceiver.java  | 61 +++++++++++++++++++-
 .../application/ApplicationsEventReceiver.java  |  4 +-
 .../ApplicationSignUpEventMessageDelegator.java |  4 ++
 .../signup/ApplicationSignUpEventReceiver.java  |  6 +-
 .../ClusterStatusEventMessageDelegator.java     |  4 ++
 .../status/ClusterStatusEventReceiver.java      |  6 +-
 .../DomainMappingEventMessageDelegator.java     |  4 ++
 .../mapping/DomainMappingEventReceiver.java     |  6 +-
 .../stat/HealthStatEventMessageDelegator.java   |  4 ++
 .../health/stat/HealthStatEventReceiver.java    |  5 +-
 .../InitializerEventMessageDelegator.java       |  4 ++
 .../initializer/InitializerEventReceiver.java   |  7 ++-
 .../InstanceNotifierEventMessageDelegator.java  |  4 ++
 .../notifier/InstanceNotifierEventReceiver.java |  8 +--
 .../InstanceStatusEventMessageDelegator.java    |  4 ++
 .../status/InstanceStatusEventReceiver.java     |  6 +-
 .../tenant/TenantEventMessageDelegator.java     |  4 ++
 .../receiver/tenant/TenantEventReceiver.java    |  6 +-
 .../topology/TopologyEventMessageDelegator.java |  4 ++
 .../topology/TopologyEventReceiver.java         |  6 +-
 24 files changed, 186 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c30fc63..5b01330 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -229,5 +229,8 @@ public class CloudControllerServiceComponent {
         } catch (Exception e) {
             log.warn("An error occurred while closing cloud controller 
topology event publisher", e);
         }
+
+        // shutdown TopologyEventSync task
+        StratosThreadPool.shutdown(THREAD_POOL_ID);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 687cec2..8037ce3 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
 
 /**
  * Utility class for Stratos thread pool
@@ -37,7 +34,7 @@ public class StratosThreadPool {
     private static final Log log = LogFactory.getLog(StratosThreadPool.class);
 
     private static volatile Map<String, ExecutorService> executorServiceMap = 
new ConcurrentHashMap<>();
-    private static volatile Map<String, ScheduledExecutorService> 
scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+    private static volatile Map<String, ScheduledExecutorService> 
scheduledServiceMap = new ConcurrentHashMap<>();
     private static Object executorServiceMapLock = new Object();
     private static Object scheduledServiceMapLock = new Object();
 
@@ -84,4 +81,32 @@ public class StratosThreadPool {
         }
         return scheduledExecutorService;
     }
+
+    public static void shutdown (String identifier) {
+
+        ExecutorService executorService = executorServiceMap.get(identifier);
+        if (executorService == null) {
+            log.warn("No executor service found for id " + identifier + ", 
unable to shut down");
+            return;
+        }
+
+        // try to shut down gracefully
+        executorService.shutdown();
+        // wait 10 secs till terminated
+        try {
+            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.info("Thread Pool [id] " + identifier + " did not finish 
all tasks before " +
+                        "timeout, forcefully shutting down");
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            // interrupted, shutdown now
+            executorService.shutdownNow();
+        }
+
+        // remove from the map
+        executorServiceMap.remove(identifier);
+
+        log.info("Successfully shutdown thread pool associated with id: " + 
identifier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index aa7cc02..5bd3f76 100644
--- 
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++ 
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -328,5 +328,8 @@ public class StratosManagerServiceComponent {
         // Close event publisher connections to message broker
         
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
         
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
+
+        // shut down the scheduled thread pool
+        StratosThreadPool.shutdown(THREAD_POOL_ID);
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
index c97125b..b582d56 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.internal;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import 
org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
 import 
org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
 import 
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
@@ -40,8 +42,20 @@ public class MessagingServiceComponent {
     private static final Log log = 
LogFactory.getLog(MessagingServiceComponent.class);
 
     protected void activate(ComponentContext context) {
+        // activate all message receivers
         try {
-            log.info("Messaging Service bundle activated");
+            ApplicationSignUpEventReceiver.getInstance();
+            ApplicationsEventReceiver.getInstance();
+            ClusterStatusEventReceiver.getInstance();
+            DomainMappingEventReceiver.getInstance();
+            HealthStatEventReceiver.getInstance();
+            InitializerEventReceiver.getInstance();
+            TenantEventReceiver.getInstance();
+            TopologyEventReceiver.getInstance();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Messaging Service bundle activated");
+            }
         } catch (Exception e) {
             log.error("Could not activate Messaging Service component", e);
         }
@@ -58,7 +72,10 @@ public class MessagingServiceComponent {
             InitializerEventReceiver.getInstance().terminate();
             TenantEventReceiver.getInstance().terminate();
             TopologyEventReceiver.getInstance().terminate();
-            log.info("Messaging Service component is deactivated");
+            
StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID);
+            if (log.isDebugEnabled()) {
+                log.debug("Messaging Service component is deactivated");
+            }
         } catch (Exception e) {
             log.error("Could not de-activate Messaging Service component", e);
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..8c29816 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -19,12 +19,71 @@
 
 package org.apache.stratos.messaging.message.receiver;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.listener.EventListener;
+
 import java.util.concurrent.ExecutorService;
 
-public class StratosEventReceiver {
+/**
+ * Abstraction for Event Receivers used in Stratos
+ */
+public abstract class StratosEventReceiver {
+
+    private static final Log log = 
LogFactory.getLog(StratosEventReceiver.class);
+
+    /**
+     * Thread pool information for all StratosEventReceiver implementations
+     */
+
+    public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID = 
"stratos-event-receiver-pool";
+    private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE = 
"stratos.event.receiver.pool.size";
 
+    // thread pool id
+    protected String threadPoolId;
+    // executor service used
     protected ExecutorService executorService;
+    // pool size
+    protected static int threadPoolSize = 15;
+
+    static {
+        // check if the thread pool size is given as a system parameter
+        String poolSize = 
System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE);
+        if (poolSize != null) {
+            try {
+                threadPoolSize = Integer.parseInt(poolSize);
+            } catch (NumberFormatException e) {
+                log.error("Invalid configuration found for 
StratosEventReceiver thread pool size", e);
+                threadPoolSize = 15;
+            }
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Number of threads used in pool " + 
STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize);
+        }
+    }
 
     public StratosEventReceiver () {
+        this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID;
+        this.executorService = 
StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize);
     }
+
+    /**
+     * Adds an EventListener to this StratosEventReceiver instance
+     *
+     * @param eventListener EventListener instance to add
+     */
+    public abstract void addEventListener(EventListener eventListener);
+
+    /**
+     * Removed an EventListener from this StratosEventReceiver instance
+     *
+     * @param eventListener EventListener instance to remove
+     */
+    public abstract void removeEventListener(EventListener eventListener);
+
+    /**
+     * Terminates this StratosEventReceiver instance
+     */
+    public abstract void terminate();
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 69dba01..89dd73e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -38,8 +38,6 @@ public class ApplicationsEventReceiver extends 
StratosEventReceiver{
     private static volatile ApplicationsEventReceiver instance;
 
     private ApplicationsEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("application-event-receiver", 100);
         ApplicationsEventMessageQueue messageQueue = new 
ApplicationsEventMessageQueue();
         this.messageDelegator = new 
ApplicationsEventMessageDelegator(messageQueue);
         this.messageListener = new 
ApplicationsEventMessageListener(messageQueue);
@@ -66,7 +64,7 @@ public class ApplicationsEventReceiver extends 
StratosEventReceiver{
         messageDelegator.removeEventListener(eventListener);
     }
 
-    public void execute() {
+    private void execute() {
         try {
             // Start topic subscriber thread
             eventSubscriber = new 
EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
index adf805d..59374bb 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ApplicationSignUpEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index df90cf9..5ad6070 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -43,8 +43,6 @@ public class ApplicationSignUpEventReceiver extends 
StratosEventReceiver {
     private static volatile ApplicationSignUpEventReceiver instance;
 
     private ApplicationSignUpEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
         ApplicationSignUpEventMessageQueue messageQueue = new 
ApplicationSignUpEventMessageQueue();
         this.messageDelegator = new 
ApplicationSignUpEventMessageDelegator(messageQueue);
         this.messageListener = new 
ApplicationSignUpEventMessageListener(messageQueue);
@@ -67,6 +65,10 @@ public class ApplicationSignUpEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
index 5c9c502..954d9be 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ClusterStatusEventMessageDelegator implements Runnable 
{
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index be42b43..9de351b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -39,8 +39,6 @@ public class ClusterStatusEventReceiver extends 
StratosEventReceiver {
     private static volatile ClusterStatusEventReceiver instance;
 
     private ClusterStatusEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
         ClusterStatusEventMessageQueue messageQueue = new 
ClusterStatusEventMessageQueue();
         this.messageDelegator = new 
ClusterStatusEventMessageDelegator(messageQueue);
         this.messageListener = new 
ClusterStatusEventMessageListener(messageQueue);
@@ -51,6 +49,10 @@ public class ClusterStatusEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     public static ClusterStatusEventReceiver getInstance () {
         if (instance == null) {
             synchronized (ClusterStatusEventReceiver.class) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
index fa783a9..03154f2 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
@@ -46,6 +46,10 @@ class DomainMappingEventMessageDelegator implements Runnable 
{
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6de99c0..6c88f73 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -40,8 +40,6 @@ public class DomainMappingEventReceiver extends 
StratosEventReceiver {
     private static volatile DomainMappingEventReceiver instance;
 
     private DomainMappingEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
         DomainMappingEventMessageQueue messageQueue = new 
DomainMappingEventMessageQueue();
         this.messageDelegator = new 
DomainMappingEventMessageDelegator(messageQueue);
         this.messageListener = new 
DomainMappingEventMessageListener(messageQueue);
@@ -52,6 +50,10 @@ public class DomainMappingEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     public static DomainMappingEventReceiver getInstance () {
         if (instance == null) {
             synchronized (DomainMappingEventReceiver.class) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 2cde2a9..29fb47b 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -48,6 +48,10 @@ class HealthStatEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index a9d2602..442bdb6 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -39,8 +39,6 @@ public class HealthStatEventReceiver extends 
StratosEventReceiver {
     private static volatile HealthStatEventReceiver instance;
 
     private HealthStatEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
         HealthStatEventMessageQueue messageQueue = new 
HealthStatEventMessageQueue();
         this.messageDelegator = new 
HealthStatEventMessageDelegator(messageQueue);
         this.messageListener = new 
HealthStatEventMessageListener(messageQueue);
@@ -63,6 +61,9 @@ public class HealthStatEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
 
     private void execute() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
index ffd2ae4..baca350 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -41,6 +41,10 @@ public class InitializerEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 805a8bf..c7e5daf 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -26,7 +26,6 @@ import org.apache.stratos.messaging.listener.EventListener;
 import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
 import org.apache.stratos.messaging.util.MessagingUtil;
 
-import java.util.concurrent.ExecutorService;
 
 public class InitializerEventReceiver extends StratosEventReceiver {
     private static final Log log = 
LogFactory.getLog(InitializerEventReceiver.class);
@@ -38,8 +37,6 @@ public class InitializerEventReceiver extends 
StratosEventReceiver {
     //private ExecutorService executorService;
 
     private InitializerEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
         InitializerEventMessageQueue initializerEventMessageQueue = new 
InitializerEventMessageQueue();
         this.messageDelegator = new 
InitializerEventMessageDelegator(initializerEventMessageQueue);
         this.messageListener = new 
InitializerEventMessageListener(initializerEventMessageQueue);
@@ -62,6 +59,10 @@ public class InitializerEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 73ef9fe..b695db7 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceNotifierEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index e0b8e9f..5bcd75a 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -39,8 +39,6 @@ public class InstanceNotifierEventReceiver extends 
StratosEventReceiver {
     //private boolean terminated;
 
     private InstanceNotifierEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         InstanceNotifierEventMessageQueue messageQueue = new 
InstanceNotifierEventMessageQueue();
         this.messageDelegator = new 
InstanceNotifierEventMessageDelegator(messageQueue);
         this.messageListener = new 
InstanceNotifierEventMessageListener(messageQueue);
@@ -63,6 +61,10 @@ public class InstanceNotifierEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread
@@ -94,7 +96,5 @@ public class InstanceNotifierEventReceiver extends 
StratosEventReceiver {
     public synchronized void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-        //terminated = true;
-        log.info("InstanceNotifierEventReceiver terminated");
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
index 9f754b0..e5df65e 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceStatusEventMessageDelegator implements 
Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a565ea9..3d9f793 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -38,8 +38,6 @@ public class InstanceStatusEventReceiver extends 
StratosEventReceiver {
     private static volatile InstanceStatusEventReceiver instance;
 
     private InstanceStatusEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         InstanceStatusEventMessageQueue messageQueue = new 
InstanceStatusEventMessageQueue();
         this.messageDelegator = new 
InstanceStatusEventMessageDelegator(messageQueue);
         this.messageListener = new 
InstanceStatusEventMessageListener(messageQueue);
@@ -62,6 +60,9 @@ public class InstanceStatusEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
 
     private void execute() {
         try {
@@ -91,6 +92,5 @@ public class InstanceStatusEventReceiver extends 
StratosEventReceiver {
     public void terminate() {
         eventSubscriber.terminate();
         messageDelegator.terminate();
-       // terminated = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index c735d9b..cd8724c 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -48,6 +48,10 @@ class TenantEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index a52cb20..e30d3ab 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -42,8 +42,6 @@ public class TenantEventReceiver extends StratosEventReceiver 
{
     private static volatile TenantEventReceiver instance;
 
     private TenantEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
         TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
         this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
         this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -66,6 +64,10 @@ public class TenantEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 8508d91..d2664f4 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -47,6 +47,10 @@ class TopologyEventMessageDelegator implements Runnable {
         processorChain.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        processorChain.removeEventListener(eventListener);
+    }
+
     @Override
     public void run() {
         try {

http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index bfa3950..4f1f254 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -44,8 +44,6 @@ public class TopologyEventReceiver extends 
StratosEventReceiver {
     private static volatile TopologyEventReceiver instance;
 
     private TopologyEventReceiver() {
-        // TODO: make pool size configurable
-        this.executorService = 
StratosThreadPool.getExecutorService("topology-event-receiver", 100);
         TopologyEventMessageQueue messageQueue = new 
TopologyEventMessageQueue();
         this.messageDelegator = new 
TopologyEventMessageDelegator(messageQueue);
         this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -68,6 +66,10 @@ public class TopologyEventReceiver extends 
StratosEventReceiver {
         messageDelegator.addEventListener(eventListener);
     }
 
+    public void removeEventListener(EventListener eventListener) {
+        messageDelegator.removeEventListener(eventListener);
+    }
+
     private void execute() {
         try {
             // Start topic subscriber thread

Reply via email to