http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java index 4b86529..78998c7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/ApplicationsEventReceiver.java @@ -29,8 +29,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - public class ApplicationsEventReceiver extends StratosEventReceiver{ private static final Log log = LogFactory.getLog(ApplicationsEventReceiver.class); @@ -41,7 +39,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ private ApplicationsEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("application-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("application-event-receiver", 35, 100); ApplicationsEventMessageQueue messageQueue = new ApplicationsEventMessageQueue(); this.messageDelegator = new ApplicationsEventMessageDelegator(messageQueue); this.messageListener = new ApplicationsEventMessageListener(messageQueue); @@ -73,14 +71,14 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Application status event message receiver thread started"); } // Start Application status event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("Application status event message delegator thread started"); @@ -99,7 +97,7 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ } public void initializeCompleteApplicationsModel() { - executorService.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!eventSubscriber.isSubscribed()) { @@ -119,10 +117,10 @@ public class ApplicationsEventReceiver extends StratosEventReceiver{ } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java index dde214d..65ca857 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/application/signup/ApplicationSignUpEventReceiver.java @@ -30,8 +30,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * Application signup event receiver. */ @@ -46,7 +44,7 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { private ApplicationSignUpEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("application-signup-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("application-signup-event-receiver", 35, 100); ApplicationSignUpEventMessageQueue messageQueue = new ApplicationSignUpEventMessageQueue(); this.messageDelegator = new ApplicationSignUpEventMessageDelegator(messageQueue); this.messageListener = new ApplicationSignUpEventMessageListener(messageQueue); @@ -75,14 +73,14 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { eventSubscriber = new EventSubscriber(MessagingUtil.Topics.APPLICATION_SIGNUP_TOPIC.getTopicName(), messageListener); // subscriber.setMessageListener(messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Application signup event message receiver thread started"); } // Start topology event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("Application signup event message delegator thread started"); } @@ -96,7 +94,7 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { } public void initializeCompleteApplicationSignUps() { - executorService.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!eventSubscriber.isSubscribed()) { @@ -121,10 +119,10 @@ public class ApplicationSignUpEventReceiver extends StratosEventReceiver { } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index 2b4d557..fbfbc4b 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -27,8 +27,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * A thread for receiving instance notifier information from message broker. */ @@ -41,7 +39,7 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { private ClusterStatusEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("clusterstatus-event-receiver", 35, 100); ClusterStatusEventMessageQueue messageQueue = new ClusterStatusEventMessageQueue(); this.messageDelegator = new ClusterStatusEventMessageDelegator(messageQueue); this.messageListener = new ClusterStatusEventMessageListener(messageQueue); @@ -68,14 +66,14 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.CLUSTER_STATUS_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("InstanceNotifier event message receiver thread started"); } // Start instance notifier event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("InstanceNotifier event message delegator thread started"); } @@ -99,10 +97,10 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { // } // // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 6b79873..5d80d32 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -27,8 +27,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * Domain mapping event receiver. */ @@ -43,7 +41,8 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { private DomainMappingEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("domainmapping-event-receiver", 35, + 100); DomainMappingEventMessageQueue messageQueue = new DomainMappingEventMessageQueue(); this.messageDelegator = new DomainMappingEventMessageDelegator(messageQueue); this.messageListener = new DomainMappingEventMessageListener(messageQueue); @@ -71,7 +70,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.DOMAIN_MAPPING_TOPIC.getTopicName(), messageListener); // subscriber.setMessageListener(messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { @@ -79,7 +78,7 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { } // Start topology event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("Domain mapping event message delegator thread started"); } @@ -98,10 +97,10 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { // } // // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index ede8f17..e7dfb46 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -27,8 +27,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * A thread for receiving health stat information from message broker */ @@ -42,7 +40,7 @@ public class HealthStatEventReceiver extends StratosEventReceiver { private HealthStatEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("healthstat-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("healthstat-event-receiver", 35, 100); HealthStatEventMessageQueue messageQueue = new HealthStatEventMessageQueue(); this.messageDelegator = new HealthStatEventMessageDelegator(messageQueue); this.messageListener = new HealthStatEventMessageListener(messageQueue); @@ -71,13 +69,13 @@ public class HealthStatEventReceiver extends StratosEventReceiver { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Health stats event message delegator thread started"); } // Start health stat event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Health stats receiver failed", e); @@ -92,10 +90,10 @@ public class HealthStatEventReceiver extends StratosEventReceiver { // } // // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java index e6429e2..15e1b64 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/initializer/InitializerEventReceiver.java @@ -26,8 +26,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - public class InitializerEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(InitializerEventReceiver.class); @@ -35,11 +33,11 @@ public class InitializerEventReceiver extends StratosEventReceiver { private InitializerEventMessageListener messageListener; private EventSubscriber eventSubscriber; private static volatile InitializerEventReceiver instance; - //private ExecutorService executorService; + //private ExecutorService executor; private InitializerEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("initializer-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("initializer-event-receiver", 35, 100); InitializerEventMessageQueue initializerEventMessageQueue = new InitializerEventMessageQueue(); this.messageDelegator = new InitializerEventMessageDelegator(initializerEventMessageQueue); this.messageListener = new InitializerEventMessageListener(initializerEventMessageQueue); @@ -67,13 +65,13 @@ public class InitializerEventReceiver extends StratosEventReceiver { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INITIALIZER_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Initializer event message delegator thread started"); } // Start initializer event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); } catch (Exception e) { if (log.isErrorEnabled()) { log.error("Initializer receiver failed", e); @@ -87,10 +85,10 @@ public class InitializerEventReceiver extends StratosEventReceiver { } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index cfc7f11..e06a2e6 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -40,7 +40,7 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { private InstanceNotifierEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("topology-event-receiver", 35, 100); InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); messageListener = new InstanceNotifierEventMessageListener(messageQueue); @@ -107,14 +107,14 @@ public class InstanceNotifierEventReceiver extends StratosEventReceiver { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Instance Notifier event message receiver thread started"); } // Start topology event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("Instance Notifier event message delegator thread started"); } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java index a2a1623..867f461 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java @@ -39,7 +39,7 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { private InstanceStatusEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("topology-event-receiver", 35, 100); InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue(); this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue); this.messageListener = new InstanceStatusEventMessageListener(messageQueue); @@ -67,13 +67,13 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener); - executorService.submit(eventSubscriber); + executor.submit(eventSubscriber); if (log.isDebugEnabled()) { log.debug("InstanceNotifier event message receiver thread started"); } // Start instance notifier event message delegate thread - executorService.submit(messageDelegator); + executor.submit(messageDelegator); if (log.isDebugEnabled()) { log.debug("InstanceNotifier event message delegator thread started"); } @@ -95,10 +95,10 @@ public class InstanceStatusEventReceiver extends StratosEventReceiver { } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java index 1c519b9..39ef2fb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantEventReceiver.java @@ -30,8 +30,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * A thread for receiving tenant information from message broker and * build tenant information in tenant manager. @@ -45,7 +43,7 @@ public class TenantEventReceiver extends StratosEventReceiver { private TenantEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("tenant-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("tenant-event-receiver", 35, 100); TenantEventMessageQueue messageQueue = new TenantEventMessageQueue(); this.messageDelegator = new TenantEventMessageDelegator(messageQueue); this.messageListener = new TenantEventMessageListener(messageQueue); @@ -68,22 +66,22 @@ public class TenantEventReceiver extends StratosEventReceiver { messageDelegator.addEventListener(eventListener); } -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TENANT_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Tenant event message receiver thread started"); } // Start tenant event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("Tenant event message delegator thread started"); } @@ -97,7 +95,7 @@ public class TenantEventReceiver extends StratosEventReceiver { } public void initializeCompleteTenant() { - executorService.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!eventSubscriber.isSubscribed()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java index 50e078a..ea1571e 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/topology/TopologyEventReceiver.java @@ -30,8 +30,6 @@ import org.apache.stratos.messaging.listener.EventListener; import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * A thread for receiving topology information from message broker and * build topology in topology manager. @@ -47,7 +45,7 @@ public class TopologyEventReceiver extends StratosEventReceiver { private TopologyEventReceiver() { // TODO: make pool size configurable - this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); + this.executor = StratosThreadPool.getExecutorService("topology-event-receiver", 35, 100); TopologyEventMessageQueue messageQueue = new TopologyEventMessageQueue(); this.messageDelegator = new TopologyEventMessageDelegator(messageQueue); this.messageListener = new TopologyEventMessageListener(messageQueue); @@ -74,14 +72,14 @@ public class TopologyEventReceiver extends StratosEventReceiver { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.TOPOLOGY_TOPIC.getTopicName(), messageListener); - executorService.execute(eventSubscriber); + executor.execute(eventSubscriber); if (log.isDebugEnabled()) { log.debug("Topology event message receiver thread started"); } // Start topology event message delegator thread - executorService.execute(messageDelegator); + executor.execute(messageDelegator); if (log.isDebugEnabled()) { log.debug("Topology event message delegator thread started"); } @@ -100,7 +98,7 @@ public class TopologyEventReceiver extends StratosEventReceiver { } public void initializeCompleteTopology() { - executorService.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { while (!eventSubscriber.isSubscribed()) { @@ -119,10 +117,10 @@ public class TopologyEventReceiver extends StratosEventReceiver { } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } // -// public void setExecutorService(ExecutorService executorService) { -// this.executorService = executorService; +// public void setExecutorService(ExecutorService executor) { +// this.executor = executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java index e1bf929..d9934db 100644 --- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataApplicationEventReceiver.java @@ -40,11 +40,11 @@ import java.util.concurrent.ExecutorService; public class MetadataApplicationEventReceiver { private static final Log log = LogFactory.getLog(MetadataApplicationEventReceiver.class); private ApplicationsEventReceiver applicationsEventReceiver; - //private ExecutorService executorService; + //private ExecutorService executor; public MetadataApplicationEventReceiver() { this.applicationsEventReceiver = ApplicationsEventReceiver.getInstance(); - //executorService = StratosThreadPool.getExecutorService(Constants + //executor = StratosThreadPool.getExecutorService(Constants // .METADATA_SERVICE_THREAD_POOL_ID, 20); addEventListeners(); } @@ -88,7 +88,7 @@ public class MetadataApplicationEventReceiver { } // public ExecutorService getExecutorService() { -// return executorService; +// return executor; // } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java index f16282d..44a210d 100644 --- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java @@ -38,11 +38,11 @@ import java.util.concurrent.ExecutorService; public class MetadataTopologyEventReceiver { private static final Log log = LogFactory.getLog(MetadataTopologyEventReceiver.class); private TopologyEventReceiver topologyEventReceiver; - private ExecutorService executorService; + //private ExecutorService executorService; public MetadataTopologyEventReceiver() { this.topologyEventReceiver = TopologyEventReceiver.getInstance(); -// //executorService = StratosThreadPool.getExecutorService(Constants +// //executor = StratosThreadPool.getExecutorService(Constants // .METADATA_SERVICE_THREAD_POOL_ID, 20); addEventListeners(); } @@ -84,7 +84,7 @@ public class MetadataTopologyEventReceiver { // } // } - public ExecutorService getExecutorService() { - return executorService; - } +// public ExecutorService getExecutorService() { +// return executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java index 9886335..df7145e 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java @@ -41,10 +41,7 @@ import org.apache.stratos.mock.iaas.event.publisher.MockMemberEventPublisher; import org.apache.stratos.mock.iaas.statistics.publisher.MockHealthStatisticsNotifier; import java.io.Serializable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -65,8 +62,8 @@ public class MockInstance implements Serializable { private final MockInstanceContext mockInstanceContext; private final AtomicBoolean hasGracefullyShutdown = new AtomicBoolean(false); - private static final ExecutorService eventListenerExecutorService = StratosThreadPool - .getExecutorService("mock.iaas.event.listener.thread.pool", 100); + private static final ThreadPoolExecutor eventListenerExecutor = StratosThreadPool + .getExecutorService("mock.iaas.event.listener.thread.pool", 35, 100); private static final ScheduledExecutorService healthStatNotifierExecutorService = StratosThreadPool .getScheduledExecutorService("mock.iaas.health.statistics.notifier.thread.pool", 100); @@ -151,7 +148,7 @@ public class MockInstance implements Serializable { } } }); -// topologyEventReceiver.setExecutorService(eventListenerExecutorService); +// topologyEventReceiver.setExecutorService(eventListenerExecutor); // topologyEventReceiver.execute(); if (log.isDebugEnabled()) { log.debug(String.format( @@ -185,7 +182,7 @@ public class MockInstance implements Serializable { }); // TODO: Fix InstanceNotifierEventReceiver to use executor service // do not remove this since execute() is a blocking call -// eventListenerExecutorService.submit(new Runnable() { +// eventListenerExecutor.submit(new Runnable() { // @Override // public void run() { // instanceNotifierEventReceiver.execute(); http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java index 73fa971..75bdde9 100644 --- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java +++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java @@ -26,7 +26,7 @@ import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * AWS extension main class. @@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorService; public class Main { private static final Log log = LogFactory.getLog(Main.class); - private static ExecutorService executorService; + private static ThreadPoolExecutor executor; public static void main(String[] args) { @@ -48,8 +48,8 @@ public class Main { log.info("AWS extension started"); } - executorService = StratosThreadPool.getExecutorService( - "aws.extension.thread.pool", 10); + executor = StratosThreadPool.getExecutorService( + "aws.extension.thread.pool", 5, 10); // Validate runtime parameters AWSExtensionContext.getInstance().validate(); TopologyProvider topologyProvider = new TopologyProvider(); @@ -58,7 +58,7 @@ public class Main { topologyProvider) : null; extension = new LoadBalancerExtension(new AWSLoadBalancer(), statisticsReader, topologyProvider); - extension.setExecutorService(executorService); + extension.setExecutorService(executor); extension.execute(); // Add shutdown hook http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/extensions/load-balancer/modules/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/modules/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java index f56541d..03156fd 100644 --- a/extensions/load-balancer/modules/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java +++ b/extensions/load-balancer/modules/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java @@ -26,14 +26,14 @@ import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * HAProxy extension main class. */ public class Main { private static final Log log = LogFactory.getLog(Main.class); - private static ExecutorService executorService; + private static ThreadPoolExecutor executor; public static void main(String[] args) { @@ -63,14 +63,14 @@ public class Main { } }); - executorService = StratosThreadPool.getExecutorService("haproxy.extension.thread.pool", 10); + executor = StratosThreadPool.getExecutorService("haproxy.extension.thread.pool", 5, 10); // Validate runtime parameters HAProxyContext.getInstance().validate(); TopologyProvider topologyProvider = new TopologyProvider(); HAProxyStatisticsReader statisticsReader = HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? new HAProxyStatisticsReader(topologyProvider) : null; extension = new LoadBalancerExtension(new HAProxy(), statisticsReader, topologyProvider); - extension.setExecutorService(executorService); + extension.setExecutorService(executor); extension.execute(); } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/extensions/load-balancer/modules/lvs-extension/src/main/java/org/apache/stratos/lvs/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/lvs-extension/src/main/java/org/apache/stratos/lvs/extension/Main.java b/extensions/load-balancer/modules/lvs-extension/src/main/java/org/apache/stratos/lvs/extension/Main.java index 52463e0..c5491ae 100644 --- a/extensions/load-balancer/modules/lvs-extension/src/main/java/org/apache/stratos/lvs/extension/Main.java +++ b/extensions/load-balancer/modules/lvs-extension/src/main/java/org/apache/stratos/lvs/extension/Main.java @@ -26,14 +26,14 @@ import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * LVS extension main class. */ public class Main { private static final Log log = LogFactory.getLog(Main.class); - private static ExecutorService executorService; + private static ThreadPoolExecutor executor; public static void main(String[] args) { @@ -63,14 +63,14 @@ public class Main { } }); - executorService = StratosThreadPool.getExecutorService("lvs.extension.thread.pool", 10); + executor = StratosThreadPool.getExecutorService("lvs.extension.thread.pool", 5, 10); // Validate runtime parameters LVSContext.getInstance().validate(); TopologyProvider topologyProvider = new TopologyProvider(); LVSStatisticsReader statisticsReader = LVSContext.getInstance().isCEPStatsPublisherEnabled() ? new LVSStatisticsReader(topologyProvider) : null; extension = new LoadBalancerExtension(new LVS(), statisticsReader, topologyProvider); - extension.setExecutorService(executorService); + extension.setExecutorService(executor); extension.execute(); } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/extensions/load-balancer/modules/nginx-extension/src/main/java/org/apache/stratos/nginx/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/nginx-extension/src/main/java/org/apache/stratos/nginx/extension/Main.java b/extensions/load-balancer/modules/nginx-extension/src/main/java/org/apache/stratos/nginx/extension/Main.java index ab0eb7a..b10e360 100644 --- a/extensions/load-balancer/modules/nginx-extension/src/main/java/org/apache/stratos/nginx/extension/Main.java +++ b/extensions/load-balancer/modules/nginx-extension/src/main/java/org/apache/stratos/nginx/extension/Main.java @@ -26,14 +26,14 @@ import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; /** * Nginx extension main class. */ public class Main { private static final Log log = LogFactory.getLog(Main.class); - private static ExecutorService executorService; + private static ThreadPoolExecutor executor; public static void main(String[] args) { @@ -63,14 +63,14 @@ public class Main { } }); - executorService = StratosThreadPool.getExecutorService("nginx.extension.thread.pool", 10); + executor = StratosThreadPool.getExecutorService("nginx.extension.thread.pool", 5, 10); // Validate runtime parameters NginxContext.getInstance().validate(); TopologyProvider topologyProvider = new TopologyProvider(); NginxStatisticsReader statisticsReader = NginxContext.getInstance().isCEPStatsPublisherEnabled() ? new NginxStatisticsReader(topologyProvider) : null; extension = new LoadBalancerExtension(new Nginx(), statisticsReader, topologyProvider); - extension.setExecutorService(executorService); + extension.setExecutorService(executor); extension.execute(); } catch (Exception e) { if (log.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java index 4c8f19b..5b951dd 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java @@ -123,7 +123,8 @@ public abstract class PythonAgentIntegrationTest { startActiveMQInstance(Integer.parseInt(amqpBindPorts[i]), Integer.parseInt(mqttBindPorts[i]), true); } - ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize); + //ExecutorService executorService = StratosThreadPool.getExecutorService + // ("TEST_THREAD_POOL", testThreadPoolSize); topologyEventReceiver = TopologyEventReceiver.getInstance(); // topologyEventReceiver.setExecutorService(executorService); // topologyEventReceiver.execute(); http://git-wip-us.apache.org/repos/asf/stratos/blob/cef4fe1b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java ---------------------------------------------------------------------- diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java index bdd087b..a91c034 100644 --- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java +++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java @@ -49,8 +49,8 @@ import org.apache.stratos.mock.iaas.client.MockIaasApiClient; import java.rmi.RemoteException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import static org.testng.AssertJUnit.*; @@ -75,7 +75,8 @@ public class TopologyHandler { private TopologyEventReceiver topologyEventReceiver; private TenantEventReceiver tenantEventReceiver; private ApplicationSignUpEventReceiver applicationSignUpEventReceiver; - private ExecutorService executorService = StratosThreadPool.getExecutorService("stratos.integration.test.pool", 30); + private ThreadPoolExecutor executor = StratosThreadPool.getExecutorService("stratos.integration" + + ".test.pool", 20, 30); private Map<String, Long> terminatedMembers = new ConcurrentHashMap<>(); private Map<String, Long> terminatingMembers = new ConcurrentHashMap<>(); private Map<String, Long> createdMembers = new ConcurrentHashMap<String, Long>(); @@ -109,13 +110,13 @@ public class TopologyHandler { private void initializeApplicationSignUpEventReceiver() { applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance(); -// applicationSignUpEventReceiver.setExecutorService(executorService); +// applicationSignUpEventReceiver.setExecutorService(executor); // applicationSignUpEventReceiver.execute(); } private void initializeTenantEventReceiver() { tenantEventReceiver = TenantEventReceiver.getInstance(); -// tenantEventReceiver.setExecutorService(executorService); +// tenantEventReceiver.setExecutorService(executor); // tenantEventReceiver.execute(); } @@ -124,7 +125,7 @@ public class TopologyHandler { */ private void initializeHealthStatsEventReceiver() { healthStatEventReceiver = HealthStatEventReceiver.getInstance(); -// healthStatEventReceiver.setExecutorService(executorService); +// healthStatEventReceiver.setExecutorService(executor); healthStatEventReceiver.addEventListener(new MemberFaultEventListener() { @Override protected void onEvent(Event event) { @@ -141,7 +142,7 @@ public class TopologyHandler { */ private void initializeApplicationEventReceiver() { applicationsEventReceiver = ApplicationsEventReceiver.getInstance(); -// applicationsEventReceiver.setExecutorService(executorService); +// applicationsEventReceiver.setExecutorService(executor); applicationsEventReceiver.addEventListener(new ApplicationInstanceActivatedEventListener() { @Override protected void onEvent(Event event) { @@ -172,7 +173,7 @@ public class TopologyHandler { */ private void initializeTopologyEventReceiver() { topologyEventReceiver = TopologyEventReceiver.getInstance(); -// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.setExecutorService(executor); topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { @@ -315,7 +316,7 @@ public class TopologyHandler { }; applicationsEventReceiver.addEventListener(activatedEventListener); - Future future = executorService.submit(new Runnable() { + Future future = executor.submit(new Runnable() { @Override public void run() { Application application = ApplicationManager.getApplications().getApplication(applicationId); @@ -391,7 +392,7 @@ public class TopologyHandler { }; applicationsEventReceiver.addEventListener(inactivatedEventListener); - Future future = executorService.submit(new Runnable() { + Future future = executor.submit(new Runnable() { @Override public void run() { Application application = ApplicationManager.getApplications().getApplication(applicationId);
