making ClusterStatusEventReceiver singleton and fixing references in components
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/27ae4baf Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/27ae4baf Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/27ae4baf Branch: refs/heads/master Commit: 27ae4bafc268a59dcd65336ca9ba491eaeae7426 Parents: c627ff1 Author: Isuru Haththotuwa <[email protected]> Authored: Wed Dec 2 18:30:49 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Thu Dec 24 20:04:51 2015 +0530 ---------------------------------------------------------------------- .../CloudControllerServiceComponent.java | 4 +- .../status/ClusterStatusTopicReceiver.java | 59 ++++++++++---------- .../status/ClusterStatusEventReceiver.java | 51 +++++++++++------ .../mapping/DomainMappingEventReceiver.java | 27 ++++----- 4 files changed, 78 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index 2368596..74d36e7 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -154,8 +154,8 @@ public class CloudControllerServiceComponent { } clusterStatusTopicReceiver = new ClusterStatusTopicReceiver(); - clusterStatusTopicReceiver.setExecutorService(executorService); - clusterStatusTopicReceiver.execute(); +// clusterStatusTopicReceiver.setExecutorService(executorService); +// clusterStatusTopicReceiver.execute(); if (log.isInfoEnabled()) { log.info("Cluster status event receiver thread started"); http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java index daa6bf5..e0b9f62 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java @@ -32,28 +32,27 @@ import java.util.concurrent.ExecutorService; public class ClusterStatusTopicReceiver { private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class); - private ClusterStatusEventReceiver statusEventReceiver; - private boolean terminated; - private ExecutorService executorService; + private ClusterStatusEventReceiver clusterStatusEventReceiver; + //private boolean terminated; + //private ExecutorService executorService; public ClusterStatusTopicReceiver() { - this.statusEventReceiver = new ClusterStatusEventReceiver(); - + this.clusterStatusEventReceiver = ClusterStatusEventReceiver.getInstance(); addEventListeners(); } - public void execute() { - statusEventReceiver.setExecutorService(executorService); - statusEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Cloud controller Cluster status thread started"); - } - - } +// public void execute() { +// clusterStatusEventReceiver.setExecutorService(executorService); +// clusterStatusEventReceiver.execute(); +// if (log.isInfoEnabled()) { +// log.info("Cloud controller Cluster status thread started"); +// } +// +// } private void addEventListeners() { // Listen to topology events that affect clusters - statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() { + clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() { @Override protected void onEvent(Event event) { try { @@ -64,14 +63,14 @@ public class ClusterStatusTopicReceiver { } }); - statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() { + clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() { @Override protected void onEvent(Event event) { //TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event); } }); - statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() { + clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() { @Override protected void onEvent(Event event) { try { @@ -82,7 +81,7 @@ public class ClusterStatusTopicReceiver { } }); - statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() { + clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() { @Override protected void onEvent(Event event) { try { @@ -93,7 +92,7 @@ public class ClusterStatusTopicReceiver { } }); - statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() { + clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() { @Override protected void onEvent(Event event) { try { @@ -104,7 +103,7 @@ public class ClusterStatusTopicReceiver { } }); - statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() { + clusterStatusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() { @Override protected void onEvent(Event event) { try { @@ -116,15 +115,15 @@ public class ClusterStatusTopicReceiver { }); } - public void setTerminated(boolean terminated) { - this.terminated = terminated; - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public void setTerminated(boolean terminated) { +// this.terminated = terminated; +// } +// +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index 119cf49..2b4d557 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -21,8 +21,10 @@ package org.apache.stratos.messaging.message.receiver.cluster.status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; import java.util.concurrent.ExecutorService; @@ -30,26 +32,39 @@ import java.util.concurrent.ExecutorService; /** * A thread for receiving instance notifier information from message broker. */ -public class ClusterStatusEventReceiver { +public class ClusterStatusEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(ClusterStatusEventReceiver.class); private final ClusterStatusEventMessageDelegator messageDelegator; private final ClusterStatusEventMessageListener messageListener; private EventSubscriber eventSubscriber; - private boolean terminated; - private ExecutorService executorService; + private static volatile ClusterStatusEventReceiver instance; - public ClusterStatusEventReceiver() { + private ClusterStatusEventReceiver() { + // TODO: make pool size configurable + this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100); ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue(); this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue); this.messageListener = new ClusterStatusEventMessageListener(messageQueue); + execute(); } public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } + public static ClusterStatusEventReceiver getInstance () { + if (instance == null) { + synchronized (ClusterStatusEventReceiver.class) { + if (instance == null) { + instance = new ClusterStatusEventReceiver(); + } + } + } - public void execute() { + return instance; + } + + private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener); @@ -77,17 +92,17 @@ public class ClusterStatusEventReceiver { return ((eventSubscriber != null) && (eventSubscriber.isSubscribed())); } - public void terminate() { - eventSubscriber.terminate(); - messageDelegator.terminate(); - terminated = true; - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public void terminate() { +// eventSubscriber.terminate(); +// messageDelegator.terminate(); +// terminated = true; +// } +// +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/27ae4baf/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 3c723a3..6b79873 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -47,6 +47,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue(); this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue); this.messageListener = new DomainMappingEventMessageListener(messageQueue); + execute(); } public void addEventListener(EventListener eventListener) { @@ -65,7 +66,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { return instance; } - public void execute() { + private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.DOMAIN_MAPPING_TOPIC.getTopicName(), messageListener); @@ -91,16 +92,16 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { } } - public void terminate() { - eventSubscriber.terminate(); - messageDelegator.terminate(); - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public void terminate() { +// eventSubscriber.terminate(); +// messageDelegator.terminate(); +// } +// +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } }
