adding comments for StratosEventReceiver abstraction, starting the event
reseivers from messaging activator and adding shutdown for tenant, application
and signup synchronizers
Conflicts:
components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c90eb9a7
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c90eb9a7
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c90eb9a7
Branch: refs/heads/master
Commit: c90eb9a728ca6b9f2ef33dbd6ceece635ddb77ab
Parents: 905e140
Author: Isuru Haththotuwa <[email protected]>
Authored: Thu Dec 24 16:55:07 2015 +0530
Committer: Isuru Haththotuwa <[email protected]>
Committed: Thu Dec 24 20:12:43 2015 +0530
----------------------------------------------------------------------
.../CloudControllerServiceComponent.java | 3 +
.../common/threading/StratosThreadPool.java | 35 +++++++++--
.../StratosManagerServiceComponent.java | 3 +
.../internal/MessagingServiceComponent.java | 21 ++++++-
.../message/receiver/StratosEventReceiver.java | 61 +++++++++++++++++++-
.../application/ApplicationsEventReceiver.java | 4 +-
.../ApplicationSignUpEventMessageDelegator.java | 4 ++
.../signup/ApplicationSignUpEventReceiver.java | 6 +-
.../ClusterStatusEventMessageDelegator.java | 4 ++
.../status/ClusterStatusEventReceiver.java | 6 +-
.../DomainMappingEventMessageDelegator.java | 4 ++
.../mapping/DomainMappingEventReceiver.java | 6 +-
.../stat/HealthStatEventMessageDelegator.java | 4 ++
.../health/stat/HealthStatEventReceiver.java | 5 +-
.../InitializerEventMessageDelegator.java | 4 ++
.../initializer/InitializerEventReceiver.java | 7 ++-
.../InstanceNotifierEventMessageDelegator.java | 4 ++
.../notifier/InstanceNotifierEventReceiver.java | 8 +--
.../InstanceStatusEventMessageDelegator.java | 4 ++
.../status/InstanceStatusEventReceiver.java | 6 +-
.../tenant/TenantEventMessageDelegator.java | 4 ++
.../receiver/tenant/TenantEventReceiver.java | 6 +-
.../topology/TopologyEventMessageDelegator.java | 4 ++
.../topology/TopologyEventReceiver.java | 6 +-
24 files changed, 186 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index c30fc63..5b01330 100644
---
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -229,5 +229,8 @@ public class CloudControllerServiceComponent {
} catch (Exception e) {
log.warn("An error occurred while closing cloud controller
topology event publisher", e);
}
+
+ // shutdown TopologyEventSync task
+ StratosThreadPool.shutdown(THREAD_POOL_ID);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 687cec2..8037ce3 100644
---
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.*;
/**
* Utility class for Stratos thread pool
@@ -37,7 +34,7 @@ public class StratosThreadPool {
private static final Log log = LogFactory.getLog(StratosThreadPool.class);
private static volatile Map<String, ExecutorService> executorServiceMap =
new ConcurrentHashMap<>();
- private static volatile Map<String, ScheduledExecutorService>
scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>();
+ private static volatile Map<String, ScheduledExecutorService>
scheduledServiceMap = new ConcurrentHashMap<>();
private static Object executorServiceMapLock = new Object();
private static Object scheduledServiceMapLock = new Object();
@@ -84,4 +81,32 @@ public class StratosThreadPool {
}
return scheduledExecutorService;
}
+
+ public static void shutdown (String identifier) {
+
+ ExecutorService executorService = executorServiceMap.get(identifier);
+ if (executorService == null) {
+ log.warn("No executor service found for id " + identifier + ",
unable to shut down");
+ return;
+ }
+
+ // try to shut down gracefully
+ executorService.shutdown();
+ // wait 10 secs till terminated
+ try {
+ if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.info("Thread Pool [id] " + identifier + " did not finish
all tasks before " +
+ "timeout, forcefully shutting down");
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ // interrupted, shutdown now
+ executorService.shutdownNow();
+ }
+
+ // remove from the map
+ executorServiceMap.remove(identifier);
+
+ log.info("Successfully shutdown thread pool associated with id: " +
identifier);
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
index aa7cc02..5bd3f76 100644
---
a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
+++
b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java
@@ -328,5 +328,8 @@ public class StratosManagerServiceComponent {
// Close event publisher connections to message broker
EventPublisherPool.close(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName());
EventPublisherPool.close(MessagingUtil.Topics.TENANT_TOPIC.getTopicName());
+
+ // shut down the scheduled thread pool
+ StratosThreadPool.shutdown(THREAD_POOL_ID);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
index c97125b..b582d56 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java
@@ -21,6 +21,8 @@ package org.apache.stratos.messaging.internal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import
org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver;
import
org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
import
org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
@@ -40,8 +42,20 @@ public class MessagingServiceComponent {
private static final Log log =
LogFactory.getLog(MessagingServiceComponent.class);
protected void activate(ComponentContext context) {
+ // activate all message receivers
try {
- log.info("Messaging Service bundle activated");
+ ApplicationSignUpEventReceiver.getInstance();
+ ApplicationsEventReceiver.getInstance();
+ ClusterStatusEventReceiver.getInstance();
+ DomainMappingEventReceiver.getInstance();
+ HealthStatEventReceiver.getInstance();
+ InitializerEventReceiver.getInstance();
+ TenantEventReceiver.getInstance();
+ TopologyEventReceiver.getInstance();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Messaging Service bundle activated");
+ }
} catch (Exception e) {
log.error("Could not activate Messaging Service component", e);
}
@@ -58,7 +72,10 @@ public class MessagingServiceComponent {
InitializerEventReceiver.getInstance().terminate();
TenantEventReceiver.getInstance().terminate();
TopologyEventReceiver.getInstance().terminate();
- log.info("Messaging Service component is deactivated");
+
StratosThreadPool.shutdown(StratosEventReceiver.STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID);
+ if (log.isDebugEnabled()) {
+ log.debug("Messaging Service component is deactivated");
+ }
} catch (Exception e) {
log.error("Could not de-activate Messaging Service component", e);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
index 5ac89e6..8c29816 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/StratosEventReceiver.java
@@ -19,12 +19,71 @@
package org.apache.stratos.messaging.message.receiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.messaging.listener.EventListener;
+
import java.util.concurrent.ExecutorService;
-public class StratosEventReceiver {
+/**
+ * Abstraction for Event Receivers used in Stratos
+ */
+public abstract class StratosEventReceiver {
+
+ private static final Log log =
LogFactory.getLog(StratosEventReceiver.class);
+
+ /**
+ * Thread pool information for all StratosEventReceiver implementations
+ */
+
+ public static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID =
"stratos-event-receiver-pool";
+ private static String STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE =
"stratos.event.receiver.pool.size";
+ // thread pool id
+ protected String threadPoolId;
+ // executor service used
protected ExecutorService executorService;
+ // pool size
+ protected static int threadPoolSize = 15;
+
+ static {
+ // check if the thread pool size is given as a system parameter
+ String poolSize =
System.getProperty(STRATOS_EVENT_RECEIEVER_THREAD_POOL_SIZE);
+ if (poolSize != null) {
+ try {
+ threadPoolSize = Integer.parseInt(poolSize);
+ } catch (NumberFormatException e) {
+ log.error("Invalid configuration found for
StratosEventReceiver thread pool size", e);
+ threadPoolSize = 15;
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Number of threads used in pool " +
STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID + " : " + threadPoolSize);
+ }
+ }
public StratosEventReceiver () {
+ this.threadPoolId = STRATOS_EVENT_RECEIEVER_THREAD_POOL_ID;
+ this.executorService =
StratosThreadPool.getExecutorService(threadPoolId, threadPoolSize);
}
+
+ /**
+ * Adds an EventListener to this StratosEventReceiver instance
+ *
+ * @param eventListener EventListener instance to add
+ */
+ public abstract void addEventListener(EventListener eventListener);
+
+ /**
+ * Removed an EventListener from this StratosEventReceiver instance
+ *
+ * @param eventListener EventListener instance to remove
+ */
+ public abstract void removeEventListener(EventListener eventListener);
+
+ /**
+ * Terminates this StratosEventReceiver instance
+ */
+ public abstract void terminate();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
index 69dba01..89dd73e 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java
@@ -38,8 +38,6 @@ public class ApplicationsEventReceiver extends
StratosEventReceiver{
private static volatile ApplicationsEventReceiver instance;
private ApplicationsEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("application-event-receiver", 100);
ApplicationsEventMessageQueue messageQueue = new
ApplicationsEventMessageQueue();
this.messageDelegator = new
ApplicationsEventMessageDelegator(messageQueue);
this.messageListener = new
ApplicationsEventMessageListener(messageQueue);
@@ -66,7 +64,7 @@ public class ApplicationsEventReceiver extends
StratosEventReceiver{
messageDelegator.removeEventListener(eventListener);
}
- public void execute() {
+ private void execute() {
try {
// Start topic subscriber thread
eventSubscriber = new
EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(),
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
index adf805d..59374bb 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ApplicationSignUpEventMessageDelegator implements
Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
index df90cf9..5ad6070 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java
@@ -43,8 +43,6 @@ public class ApplicationSignUpEventReceiver extends
StratosEventReceiver {
private static volatile ApplicationSignUpEventReceiver instance;
private ApplicationSignUpEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("application-signup-event-receiver", 100);
ApplicationSignUpEventMessageQueue messageQueue = new
ApplicationSignUpEventMessageQueue();
this.messageDelegator = new
ApplicationSignUpEventMessageDelegator(messageQueue);
this.messageListener = new
ApplicationSignUpEventMessageListener(messageQueue);
@@ -67,6 +65,10 @@ public class ApplicationSignUpEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
index 5c9c502..954d9be 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class ClusterStatusEventMessageDelegator implements Runnable
{
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
index be42b43..9de351b 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java
@@ -39,8 +39,6 @@ public class ClusterStatusEventReceiver extends
StratosEventReceiver {
private static volatile ClusterStatusEventReceiver instance;
private ClusterStatusEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100);
ClusterStatusEventMessageQueue messageQueue = new
ClusterStatusEventMessageQueue();
this.messageDelegator = new
ClusterStatusEventMessageDelegator(messageQueue);
this.messageListener = new
ClusterStatusEventMessageListener(messageQueue);
@@ -51,6 +49,10 @@ public class ClusterStatusEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
public static ClusterStatusEventReceiver getInstance () {
if (instance == null) {
synchronized (ClusterStatusEventReceiver.class) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
index fa783a9..03154f2 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventMessageDelegator.java
@@ -46,6 +46,10 @@ class DomainMappingEventMessageDelegator implements Runnable
{
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
index 6de99c0..6c88f73 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java
@@ -40,8 +40,6 @@ public class DomainMappingEventReceiver extends
StratosEventReceiver {
private static volatile DomainMappingEventReceiver instance;
private DomainMappingEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100);
DomainMappingEventMessageQueue messageQueue = new
DomainMappingEventMessageQueue();
this.messageDelegator = new
DomainMappingEventMessageDelegator(messageQueue);
this.messageListener = new
DomainMappingEventMessageListener(messageQueue);
@@ -52,6 +50,10 @@ public class DomainMappingEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
public static DomainMappingEventReceiver getInstance () {
if (instance == null) {
synchronized (DomainMappingEventReceiver.class) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index 2cde2a9..29fb47b 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -48,6 +48,10 @@ class HealthStatEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
index a9d2602..442bdb6 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java
@@ -39,8 +39,6 @@ public class HealthStatEventReceiver extends
StratosEventReceiver {
private static volatile HealthStatEventReceiver instance;
private HealthStatEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("healthstat-event-receiver", 100);
HealthStatEventMessageQueue messageQueue = new
HealthStatEventMessageQueue();
this.messageDelegator = new
HealthStatEventMessageDelegator(messageQueue);
this.messageListener = new
HealthStatEventMessageListener(messageQueue);
@@ -63,6 +61,9 @@ public class HealthStatEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
private void execute() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
index ffd2ae4..baca350 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventMessageDelegator.java
@@ -41,6 +41,10 @@ public class InitializerEventMessageDelegator implements
Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
index 805a8bf..c7e5daf 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java
@@ -26,7 +26,6 @@ import org.apache.stratos.messaging.listener.EventListener;
import org.apache.stratos.messaging.message.receiver.StratosEventReceiver;
import org.apache.stratos.messaging.util.MessagingUtil;
-import java.util.concurrent.ExecutorService;
public class InitializerEventReceiver extends StratosEventReceiver {
private static final Log log =
LogFactory.getLog(InitializerEventReceiver.class);
@@ -38,8 +37,6 @@ public class InitializerEventReceiver extends
StratosEventReceiver {
//private ExecutorService executorService;
private InitializerEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("initializer-event-receiver", 100);
InitializerEventMessageQueue initializerEventMessageQueue = new
InitializerEventMessageQueue();
this.messageDelegator = new
InitializerEventMessageDelegator(initializerEventMessageQueue);
this.messageListener = new
InitializerEventMessageListener(initializerEventMessageQueue);
@@ -62,6 +59,10 @@ public class InitializerEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
index 73ef9fe..b695db7 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceNotifierEventMessageDelegator implements
Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
index e0b8e9f..5bcd75a 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java
@@ -39,8 +39,6 @@ public class InstanceNotifierEventReceiver extends
StratosEventReceiver {
//private boolean terminated;
private InstanceNotifierEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceNotifierEventMessageQueue messageQueue = new
InstanceNotifierEventMessageQueue();
this.messageDelegator = new
InstanceNotifierEventMessageDelegator(messageQueue);
this.messageListener = new
InstanceNotifierEventMessageListener(messageQueue);
@@ -63,6 +61,10 @@ public class InstanceNotifierEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
@@ -94,7 +96,5 @@ public class InstanceNotifierEventReceiver extends
StratosEventReceiver {
public synchronized void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- //terminated = true;
- log.info("InstanceNotifierEventReceiver terminated");
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
index 9f754b0..e5df65e 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventMessageDelegator.java
@@ -46,6 +46,10 @@ class InstanceStatusEventMessageDelegator implements
Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
index a565ea9..3d9f793 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java
@@ -38,8 +38,6 @@ public class InstanceStatusEventReceiver extends
StratosEventReceiver {
private static volatile InstanceStatusEventReceiver instance;
private InstanceStatusEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("topology-event-receiver", 100);
InstanceStatusEventMessageQueue messageQueue = new
InstanceStatusEventMessageQueue();
this.messageDelegator = new
InstanceStatusEventMessageDelegator(messageQueue);
this.messageListener = new
InstanceStatusEventMessageListener(messageQueue);
@@ -62,6 +60,9 @@ public class InstanceStatusEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
private void execute() {
try {
@@ -91,6 +92,5 @@ public class InstanceStatusEventReceiver extends
StratosEventReceiver {
public void terminate() {
eventSubscriber.terminate();
messageDelegator.terminate();
- // terminated = true;
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
index c735d9b..cd8724c 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventMessageDelegator.java
@@ -48,6 +48,10 @@ class TenantEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
index a52cb20..e30d3ab 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java
@@ -42,8 +42,6 @@ public class TenantEventReceiver extends StratosEventReceiver
{
private static volatile TenantEventReceiver instance;
private TenantEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("tenant-event-receiver", 100);
TenantEventMessageQueue messageQueue = new TenantEventMessageQueue();
this.messageDelegator = new TenantEventMessageDelegator(messageQueue);
this.messageListener = new TenantEventMessageListener(messageQueue);
@@ -66,6 +64,10 @@ public class TenantEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
index 8508d91..d2664f4 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventMessageDelegator.java
@@ -47,6 +47,10 @@ class TopologyEventMessageDelegator implements Runnable {
processorChain.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ processorChain.removeEventListener(eventListener);
+ }
+
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/c90eb9a7/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
index bfa3950..4f1f254 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java
@@ -44,8 +44,6 @@ public class TopologyEventReceiver extends
StratosEventReceiver {
private static volatile TopologyEventReceiver instance;
private TopologyEventReceiver() {
- // TODO: make pool size configurable
- this.executorService =
StratosThreadPool.getExecutorService("topology-event-receiver", 100);
TopologyEventMessageQueue messageQueue = new
TopologyEventMessageQueue();
this.messageDelegator = new
TopologyEventMessageDelegator(messageQueue);
this.messageListener = new TopologyEventMessageListener(messageQueue);
@@ -68,6 +66,10 @@ public class TopologyEventReceiver extends
StratosEventReceiver {
messageDelegator.addEventListener(eventListener);
}
+ public void removeEventListener(EventListener eventListener) {
+ messageDelegator.removeEventListener(eventListener);
+ }
+
private void execute() {
try {
// Start topic subscriber thread