fixing conflicts in StratosThreadPool
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/82980e64 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/82980e64 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/82980e64 Branch: refs/heads/stratos-4.1.x Commit: 82980e64a3b935a93e9cfad0916c32cc8bebc76b Parents: 23741d8 Author: Isuru Haththotuwa <[email protected]> Authored: Fri Dec 4 05:22:40 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Thu Dec 17 11:39:26 2015 +0530 ---------------------------------------------------------------------- .../stratos/cartridge/agent/CartridgeAgent.java | 30 ++--- .../agent/CartridgeAgentEventListeners.java | 46 ++++---- .../agent/test/JavaCartridgeAgentTest.java | 10 +- .../CloudControllerServiceComponent.java | 4 +- .../status/InstanceStatusTopicReceiver.java | 24 ++-- .../common/threading/StratosThreadPool.java | 4 +- .../StratosManagerServiceComponent.java | 4 +- ...ratosManagerInstanceStatusEventReceiver.java | 22 ++-- .../notifier/InstanceNotifierEventReceiver.java | 109 +++++++++++++------ .../status/InstanceStatusEventReceiver.java | 44 +++++--- .../mock/iaas/services/impl/MockInstance.java | 32 +++--- .../tests/PythonAgentIntegrationTest.java | 6 +- .../integration/tests/adc/GitHookTestCase.java | 22 ++-- 13 files changed, 208 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java index b0bf326..c498caa 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java @@ -90,10 +90,10 @@ public class CartridgeAgent implements Runnable { } // Start instance notifier listener thread - registerInstanceNotifierEventListeners(); - if (log.isInfoEnabled()) { - log.info("Cartridge agent registerInstanceNotifierEventListeners done"); - } +// registerInstanceNotifierEventListeners(); +// if (log.isInfoEnabled()) { +// log.info("Cartridge agent registerInstanceNotifierEventListeners done"); +// } // Start tenant event receiver thread /* @@ -174,17 +174,17 @@ public class CartridgeAgent implements Runnable { logPublisherManager.stop(); } - protected void registerInstanceNotifierEventListeners() { - if (log.isDebugEnabled()) { - log.debug("SsubscribeToTopicsAndRegisterListeners before"); - } - - eventListenerns.startInstanceNotifierReceiver(); - - if (log.isDebugEnabled()) { - log.debug("SsubscribeToTopicsAndRegisterListeners after"); - } - } +// protected void registerInstanceNotifierEventListeners() { +// if (log.isDebugEnabled()) { +// log.debug("SsubscribeToTopicsAndRegisterListeners before"); +// } +// +// eventListenerns.startInstanceNotifierReceiver(); +// +// if (log.isDebugEnabled()) { +// log.debug("SsubscribeToTopicsAndRegisterListeners after"); +// } +// } // protected void registerTopologyEventListeners() { // if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java index ffa3750..5954b76 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java @@ -65,8 +65,8 @@ public class CartridgeAgentEventListeners { private ApplicationSignUpEventReceiver applicationsEventReceiver; private ExtensionHandler extensionHandler; - private static final ExecutorService eventListenerExecutorService = - StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10); +// private static final ExecutorService eventListenerExecutorService = +// StratosThreadPool.getExecutorService("cartridge.agent.event.listener.thread.pool", 10); public CartridgeAgentEventListeners() { if (log.isDebugEnabled()) { @@ -77,7 +77,7 @@ public class CartridgeAgentEventListeners { this.topologyEventReceiver = TopologyEventReceiver.getInstance(); //this.topologyEventReceiver.setExecutorService(eventListenerExecutorService); - this.instanceNotifierEventReceiver = new InstanceNotifierEventReceiver(); + this.instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance(); this.tenantEventReceiver = TenantEventReceiver.getInstance(); // this.tenantEventReceiver.setExecutorService(eventListenerExecutorService); @@ -113,23 +113,23 @@ public class CartridgeAgentEventListeners { // // } - public void startInstanceNotifierReceiver() { - - if (log.isDebugEnabled()) { - log.debug("Starting cartridge agent instance notifier event message receiver"); - } - - eventListenerExecutorService.submit(new Runnable() { - @Override - public void run() { - instanceNotifierEventReceiver.execute(); - } - }); - - if (log.isDebugEnabled()) { - log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ..."); - } - } +// public void startInstanceNotifierReceiver() { +// +// if (log.isDebugEnabled()) { +// log.debug("Starting cartridge agent instance notifier event message receiver"); +// } +// +// eventListenerExecutorService.submit(new Runnable() { +// @Override +// public void run() { +// instanceNotifierEventReceiver.execute(); +// } +// }); +// +// if (log.isDebugEnabled()) { +// log.debug("Cartridge agent Instance notifier event message receiver started, waiting for event messages ..."); +// } +// } // public void startTenantEventReceiver() { // @@ -521,9 +521,9 @@ public class CartridgeAgentEventListeners { * Terminate load balancer topology receiver thread. */ - public void terminate() { - topologyEventReceiver.terminate(); - } +// public void terminate() { +// topologyEventReceiver.terminate(); +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java index a501507..903fa58 100644 --- a/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java +++ b/components/org.apache.stratos.cartridge.agent/src/test/java/org/apache/stratos/cartridge/agent/test/JavaCartridgeAgentTest.java @@ -126,9 +126,9 @@ public class JavaCartridgeAgentTest { //topologyEventReceiver.setExecutorService(executorService); //topologyEventReceiver.execute(); - instanceStatusEventReceiver = new InstanceStatusEventReceiver(); - instanceStatusEventReceiver.setExecutorService(executorService); - instanceStatusEventReceiver.execute(); + instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance(); +// instanceStatusEventReceiver.setExecutorService(executorService); +// instanceStatusEventReceiver.execute(); instanceStarted = false; instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { @@ -191,8 +191,8 @@ public class JavaCartridgeAgentTest { } catch (Exception ignore) { } - this.instanceStatusEventReceiver.terminate(); - this.topologyEventReceiver.terminate(); + //this.instanceStatusEventReceiver.terminate(); + // this.topologyEventReceiver.terminate(); this.instanceActivated = false; this.instanceStarted = false; http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java index c4c0336..267d5a8 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java @@ -162,8 +162,8 @@ public class CloudControllerServiceComponent { } instanceStatusTopicReceiver = new InstanceStatusTopicReceiver(); - instanceStatusTopicReceiver.setExecutorService(executorService); - instanceStatusTopicReceiver.execute(); +// instanceStatusTopicReceiver.setExecutorService(executorService); +// instanceStatusTopicReceiver.execute(); if (log.isInfoEnabled()) { log.info("Instance status event receiver thread started"); http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java index e212fea..bfa205b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java @@ -46,21 +46,21 @@ public class InstanceStatusTopicReceiver { private ExecutorService executorService; public InstanceStatusTopicReceiver() { - this.statusEventReceiver = new InstanceStatusEventReceiver(); + this.statusEventReceiver = InstanceStatusEventReceiver.getInstance(); addEventListeners(); } - public void execute() { - statusEventReceiver.setExecutorService(executorService); - statusEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Cloud controller application status thread started"); - } - - if (log.isInfoEnabled()) { - log.info("Cloud controller application status thread terminated"); - } - } +// public void execute() { +// statusEventReceiver.setExecutorService(executorService); +// statusEventReceiver.execute(); +// if (log.isInfoEnabled()) { +// log.info("Cloud controller application status thread started"); +// } +// +// if (log.isInfoEnabled()) { +// log.info("Cloud controller application status thread terminated"); +// } +// } private void addEventListeners() { statusEventReceiver.addEventListener(new InstanceActivatedEventListener() { http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index c0ae8ae..687cec2 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -36,8 +36,8 @@ public class StratosThreadPool { private static final Log log = LogFactory.getLog(StratosThreadPool.class); - private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String, ExecutorService>(); - private static Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>(); + private static volatile Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<>(); + private static volatile Map<String, ScheduledExecutorService> scheduledServiceMap = new ConcurrentHashMap<String, ScheduledExecutorService>(); private static Object executorServiceMapLock = new Object(); private static Object scheduledServiceMapLock = new Object(); http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java index 573c19d..c4d68ae 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/internal/StratosManagerServiceComponent.java @@ -202,8 +202,8 @@ public class StratosManagerServiceComponent { */ private void initializeInstanceStatusEventReceiver() { instanceStatusEventReceiver = new StratosManagerInstanceStatusEventReceiver(); - instanceStatusEventReceiver.setExecutorService(executorService); - instanceStatusEventReceiver.execute(); +// instanceStatusEventReceiver.setExecutorService(executorService); +// instanceStatusEventReceiver.execute(); } /** http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java index 1da448e..ab92d1b 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/messaging/receiver/StratosManagerInstanceStatusEventReceiver.java @@ -37,32 +37,34 @@ import java.util.List; /** * Stratos manager instance status event receiver. */ -public class StratosManagerInstanceStatusEventReceiver extends InstanceStatusEventReceiver { +public class StratosManagerInstanceStatusEventReceiver { private static final Log log = LogFactory.getLog(StratosManagerInstanceStatusEventReceiver.class); private ApplicationSignUpHandler signUpManager; private ArtifactDistributionCoordinator artifactDistributionCoordinator; + private InstanceStatusEventReceiver instanceStatusEventReceiver; public StratosManagerInstanceStatusEventReceiver() { signUpManager = new ApplicationSignUpHandler(); artifactDistributionCoordinator = new ArtifactDistributionCoordinator(); + instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance(); addEventListeners(); } - @Override - public void execute() { - super.execute(); - - if (log.isInfoEnabled()) { - log.info("Stratos manager instance status event receiver thread started"); - } - } +// @Override +// public void execute() { +// super.execute(); +// +// if (log.isInfoEnabled()) { +// log.info("Stratos manager instance status event receiver thread started"); +// } +// } private void addEventListeners() { - addEventListener(new InstanceStartedEventListener() { + instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { @Override protected void onEvent(Event event) { InstanceStartedEvent instanceStartedEvent = (InstanceStartedEvent) event; http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java index 4ad6572..cfc7f11 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/notifier/InstanceNotifierEventReceiver.java @@ -21,64 +21,107 @@ package org.apache.stratos.messaging.message.receiver.instance.notifier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; /** * A thread for receiving instance notifier information from message broker. */ -public class InstanceNotifierEventReceiver { +public class InstanceNotifierEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(InstanceNotifierEventReceiver.class); private final InstanceNotifierEventMessageDelegator messageDelegator; private EventSubscriber eventSubscriber; - private boolean terminated; + private InstanceNotifierEventMessageListener messageListener; + private static volatile InstanceNotifierEventReceiver instance; + //private boolean terminated; - public InstanceNotifierEventReceiver() { + private InstanceNotifierEventReceiver() { + // TODO: make pool size configurable + this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); InstanceNotifierEventMessageQueue messageQueue = new InstanceNotifierEventMessageQueue(); this.messageDelegator = new InstanceNotifierEventMessageDelegator(messageQueue); - InstanceNotifierEventMessageListener messageListener = new InstanceNotifierEventMessageListener(messageQueue); + messageListener = new InstanceNotifierEventMessageListener(messageQueue); // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), messageListener); + execute(); + } + + public static InstanceNotifierEventReceiver getInstance () { + if (instance == null) { + synchronized (InstanceNotifierEventReceiver.class) { + if (instance == null) { + instance = new InstanceNotifierEventReceiver(); + } + } + } + + return instance; } public void addEventListener(EventListener eventListener) { messageDelegator.addEventListener(eventListener); } - public void execute() { - synchronized (this) { - if (terminated) { - log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created."); - return; +// public void execute() { +// synchronized (this) { +// if (terminated) { +// log.info("InstanceNotifierEventReceiver has been terminated. Event subscriber will not be created."); +// return; +// } +// try { +// Thread subscriberThread = new Thread(eventSubscriber); +// subscriberThread.start(); +// if (log.isDebugEnabled()) { +// log.debug("InstanceNotifier event message receiver thread started"); +// } +// +// // Start instance notifier event message delegator thread +// Thread receiverThread = new Thread(messageDelegator); +// receiverThread.start(); +// if (log.isDebugEnabled()) { +// log.debug("InstanceNotifier event message delegator thread started"); +// } +// } catch (Exception e) { +// if (log.isErrorEnabled()) { +// log.error("InstanceNotifier receiver failed", e); +// } +// } +// } +// log.info("InstanceNotifierEventReceiver started"); +// +// // Keep the thread live until terminated +// while (!terminated) { +// try { +// Thread.sleep(2000); +// } catch (InterruptedException ignore) { +// } +// } +// } + + private void execute() { + try { + // Start topic subscriber thread + eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_NOTIFIER_TOPIC.getTopicName(), + messageListener); + executorService.execute(eventSubscriber); + + if (log.isDebugEnabled()) { + log.debug("Instance Notifier event message receiver thread started"); } - try { - Thread subscriberThread = new Thread(eventSubscriber); - subscriberThread.start(); - if (log.isDebugEnabled()) { - log.debug("InstanceNotifier event message receiver thread started"); - } - // Start instance notifier event message delegator thread - Thread receiverThread = new Thread(messageDelegator); - receiverThread.start(); - if (log.isDebugEnabled()) { - log.debug("InstanceNotifier event message delegator thread started"); - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("InstanceNotifier receiver failed", e); - } + // Start topology event message delegator thread + executorService.execute(messageDelegator); + if (log.isDebugEnabled()) { + log.debug("Instance Notifier event message delegator thread started"); } - } - log.info("InstanceNotifierEventReceiver started"); - // Keep the thread live until terminated - while (!terminated) { - try { - Thread.sleep(2000); - } catch (InterruptedException ignore) { + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Instance Notifier receiver failed", e); } } } @@ -90,7 +133,7 @@ public class InstanceNotifierEventReceiver { public synchronized void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - terminated = true; + //terminated = true; log.info("InstanceNotifierEventReceiver terminated"); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java index 41f444e..a2a1623 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/instance/status/InstanceStatusEventReceiver.java @@ -21,27 +21,41 @@ package org.apache.stratos.messaging.message.receiver.instance.status; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.broker.subscribe.EventSubscriber; import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.messaging.message.receiver.StratosEventReceiver; import org.apache.stratos.messaging.util.MessagingUtil; -import java.util.concurrent.ExecutorService; - /** * A thread for receiving instance notifier information from message broker. */ -public class InstanceStatusEventReceiver { +public class InstanceStatusEventReceiver extends StratosEventReceiver { private static final Log log = LogFactory.getLog(InstanceStatusEventReceiver.class); private final InstanceStatusEventMessageDelegator messageDelegator; private final InstanceStatusEventMessageListener messageListener; private EventSubscriber eventSubscriber; - private boolean terminated; - private ExecutorService executorService; + private static volatile InstanceStatusEventReceiver instance; - public InstanceStatusEventReceiver() { + private InstanceStatusEventReceiver() { + // TODO: make pool size configurable + this.executorService = StratosThreadPool.getExecutorService("topology-event-receiver", 100); InstanceStatusEventMessageQueue messageQueue = new InstanceStatusEventMessageQueue(); this.messageDelegator = new InstanceStatusEventMessageDelegator(messageQueue); this.messageListener = new InstanceStatusEventMessageListener(messageQueue); + execute(); + } + + public static InstanceStatusEventReceiver getInstance () { + if (instance == null) { + synchronized (InstanceStatusEventReceiver.class) { + if (instance == null) { + instance = new InstanceStatusEventReceiver(); + } + } + } + + return instance; } public void addEventListener(EventListener eventListener) { @@ -49,7 +63,7 @@ public class InstanceStatusEventReceiver { } - public void execute() { + private void execute() { try { // Start topic subscriber thread eventSubscriber = new EventSubscriber(MessagingUtil.Topics.INSTANCE_STATUS_TOPIC.getTopicName(), messageListener); @@ -77,14 +91,14 @@ public class InstanceStatusEventReceiver { public void terminate() { eventSubscriber.terminate(); messageDelegator.terminate(); - terminated = true; - } - - public ExecutorService getExecutorService() { - return executorService; + // terminated = true; } - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java index c752f9e..9886335 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/services/impl/MockInstance.java @@ -161,7 +161,7 @@ public class MockInstance implements Serializable { } private void startInstanceNotifierEventReceiver() { - instanceNotifierEventReceiver = new InstanceNotifierEventReceiver(); + instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance(); instanceNotifierEventReceiver.addEventListener(new InstanceCleanupClusterEventListener() { @Override protected void onEvent(Event event) { @@ -185,17 +185,17 @@ public class MockInstance implements Serializable { }); // TODO: Fix InstanceNotifierEventReceiver to use executor service // do not remove this since execute() is a blocking call - eventListenerExecutorService.submit(new Runnable() { - @Override - public void run() { - instanceNotifierEventReceiver.execute(); - } - }); - if (log.isDebugEnabled()) { - log.debug(String.format( - "Mock instance instance notifier event message receiver started for mock member [member-id] %s", - mockInstanceContext.getMemberId())); - } +// eventListenerExecutorService.submit(new Runnable() { +// @Override +// public void run() { +// instanceNotifierEventReceiver.execute(); +// } +// }); +// if (log.isDebugEnabled()) { +// log.debug(String.format( +// "Mock instance instance notifier event message receiver started for mock member [member-id] %s", +// mockInstanceContext.getMemberId())); +// } } private void handleMemberTermination() { @@ -213,9 +213,9 @@ public class MockInstance implements Serializable { healthStatNotifierScheduledFuture.cancel(true); } - private void stopInstanceNotifierReceiver() { - instanceNotifierEventReceiver.terminate(); - } +// private void stopInstanceNotifierReceiver() { +// instanceNotifierEventReceiver.terminate(); +// } public MockInstanceContext getMockInstanceContext() { return mockInstanceContext; @@ -223,7 +223,7 @@ public class MockInstance implements Serializable { public synchronized void terminate() { if (MemberStatus.Initialized.equals(memberStatus)) { - stopInstanceNotifierReceiver(); + //stopInstanceNotifierReceiver(); stopHealthStatisticsPublisher(); memberStatus = MemberStatus.Terminated; if (log.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java index f31583c..308dde0 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java @@ -128,9 +128,9 @@ public abstract class PythonAgentIntegrationTest { // topologyEventReceiver.setExecutorService(executorService); // topologyEventReceiver.execute(); - instanceStatusEventReceiver = new InstanceStatusEventReceiver(); - instanceStatusEventReceiver.setExecutorService(executorService); - instanceStatusEventReceiver.execute(); + instanceStatusEventReceiver = InstanceStatusEventReceiver.getInstance(); +// instanceStatusEventReceiver.setExecutorService(executorService); +// instanceStatusEventReceiver.execute(); instanceStatusEventReceiver.addEventListener(new InstanceStartedEventListener() { @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/82980e64/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java ---------------------------------------------------------------------- diff --git a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java index 3f708db..7412540 100644 --- a/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java +++ b/products/stratos/modules/integration/test-integration/src/test/java/org/apache/stratos/integration/tests/adc/GitHookTestCase.java @@ -56,19 +56,19 @@ public class GitHookTestCase extends StratosIntegrationTest { private static final String appPolicyId = "application-policy-git-hook-test"; private static final String GIT_HOOK_ARTIFACT_FILENAME = "hook-req.json"; private static final int ARTIFACT_UPDATED_EXPECTED_COUNT = 2; - private ExecutorService eventListenerExecutorService = StratosThreadPool - .getExecutorService("stratos.integration.test.git.thread.pool", 5); +// private ExecutorService eventListenerExecutorService = StratosThreadPool +// .getExecutorService("stratos.integration.test.git.thread.pool", 5); @Test(timeOut = DEFAULT_TEST_TIMEOUT) public void sendRepoNotify() throws Exception { deployArtifacts(); - final InstanceNotifierEventReceiver instanceNotifierEventReceiver = new InstanceNotifierEventReceiver(); - eventListenerExecutorService.submit(new Runnable() { - @Override - public void run() { - instanceNotifierEventReceiver.execute(); - } - }); + final InstanceNotifierEventReceiver instanceNotifierEventReceiver = InstanceNotifierEventReceiver.getInstance(); +// eventListenerExecutorService.submit(new Runnable() { +// @Override +// public void run() { +// instanceNotifierEventReceiver.execute(); +// } +// }); ArtifactUpdateEventListener artifactUpdateEventListener = new ArtifactUpdateEventListener() { @Override @@ -86,8 +86,8 @@ public class GitHookTestCase extends StratosIntegrationTest { Thread.sleep(1000); } TopologyHandler.getInstance().assertApplicationActiveStatus(applicationId); - instanceNotifierEventReceiver.terminate(); - eventListenerExecutorService.shutdownNow(); + //instanceNotifierEventReceiver.terminate(); + // eventListenerExecutorService.shutdownNow(); undeployArtifacts(); }
