making TopologyEventReceiver a singleton and fixing references
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b3dc5462 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b3dc5462 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b3dc5462 Branch: refs/heads/master Commit: b3dc54628ea70b64835945e3061021c5f8c41de4 Parents: 78db9f1 Author: Isuru Haththotuwa <[email protected]> Authored: Wed Dec 2 17:28:00 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Thu Dec 24 20:03:55 2015 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 24 +++---- .../internal/AutoscalerServiceComponent.java | 18 ++--- .../stratos/cartridge/agent/CartridgeAgent.java | 28 ++++---- .../agent/CartridgeAgentEventListeners.java | 72 ++++++++++---------- .../extension/api/LoadBalancerExtension.java | 33 ++++----- .../internal/LoadBalancerServiceComponent.java | 24 +++---- .../service/MetadataTopologyEventReceiver.java | 35 +++++----- .../service/registry/MetadataApiRegistry.java | 8 +-- .../cep/extension/CEPTopologyEventReceiver.java | 20 +++--- .../extension/FaultHandlingWindowProcessor.java | 10 +-- .../cep/extension/CEPTopologyEventReceiver.java | 20 +++--- .../extension/FaultHandlingWindowProcessor.java | 10 +-- .../tests/PythonAgentIntegrationTest.java | 6 +- .../integration/common/TopologyHandler.java | 12 ++-- 14 files changed, 163 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index 6fd64a7..daa70ae 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -510,16 +510,16 @@ public class AutoscalerTopologyEventReceiver { /** * Terminate load balancer topology receiver thread. */ - public void terminate() { - topologyEventReceiver.terminate(); - terminated = true; - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } +// public void terminate() { +// topologyEventReceiver.terminate(); +// terminated = true; +// } +// +// public ExecutorService getExecutorService() { +// return executorService; +// } +// +// public void setExecutorService(ExecutorService executorService) { +// this.executorService = executorService; +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java index 76844a0..4d4c54f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java @@ -173,8 +173,8 @@ public class AutoscalerServiceComponent { // Start topology receiver asTopologyReceiver = new AutoscalerTopologyEventReceiver(); - asTopologyReceiver.setExecutorService(executorService); - asTopologyReceiver.execute(); +// asTopologyReceiver.setExecutorService(executorService); + //asTopologyReceiver.execute(); if (log.isDebugEnabled()) { log.debug("Topology receiver executor service started"); } @@ -245,13 +245,13 @@ public class AutoscalerServiceComponent { } protected void deactivate(ComponentContext context) { - if (asTopologyReceiver != null) { - try { - asTopologyReceiver.terminate(); - } catch (Exception e) { - log.warn("An error occurred while terminating autoscaler topology event receiver", e); - } - } +// if (asTopologyReceiver != null) { +// try { +// asTopologyReceiver.terminate(); +// } catch (Exception e) { +// log.warn("An error occurred while terminating autoscaler topology event receiver", e); +// } +// } if (autoscalerHealthStatEventReceiver != null) { try { http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java index 18e6e0a..b0bf326 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgent.java @@ -60,10 +60,10 @@ public class CartridgeAgent implements Runnable { } // Start topology event receiver thread - registerTopologyEventListeners(); - if (log.isInfoEnabled()) { - log.info("Cartridge agent registerTopologyEventListeners done"); - } +// registerTopologyEventListeners(); +// if (log.isInfoEnabled()) { +// log.info("Cartridge agent registerTopologyEventListeners done"); +// } if (log.isInfoEnabled()) { log.info("Waiting for CompleteTopologyEvent.."); @@ -186,16 +186,16 @@ public class CartridgeAgent implements Runnable { } } - protected void registerTopologyEventListeners() { - if (log.isDebugEnabled()) { - log.debug("registerTopologyEventListeners before"); - } - eventListenerns.startTopologyEventReceiver(); - - if (log.isDebugEnabled()) { - log.debug("registerTopologyEventListeners after"); - } - } +// protected void registerTopologyEventListeners() { +// if (log.isDebugEnabled()) { +// log.debug("registerTopologyEventListeners before"); +// } +// eventListenerns.startTopologyEventReceiver(); +// +// if (log.isDebugEnabled()) { +// log.debug("registerTopologyEventListeners after"); +// } +// } // protected void registerTenantEventListeners() { // if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java index 103d2c7..1d64ff0 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/CartridgeAgentEventListeners.java @@ -94,24 +94,24 @@ public class CartridgeAgentEventListeners { } } - public void startTopologyEventReceiver() { - - if (log.isDebugEnabled()) { - log.debug("Starting cartridge agent topology event message receiver"); - } - - eventListenerExecutorService.submit(new Runnable() { - @Override - public void run() { - topologyEventReceiver.execute(); - } - }); - - if (log.isInfoEnabled()) { - log.info("Cartridge agent topology receiver thread started, waiting for event messages ..."); - } - - } +// public void startTopologyEventReceiver() { +// +// if (log.isDebugEnabled()) { +// log.debug("Starting cartridge agent topology event message receiver"); +// } +// +// eventListenerExecutorService.submit(new Runnable() { +// @Override +// public void run() { +// topologyEventReceiver.execute(); +// } +// }); +// +// if (log.isInfoEnabled()) { +// log.info("Cartridge agent topology receiver thread started, waiting for event messages ..."); +// } +// +// } public void startInstanceNotifierReceiver() { @@ -131,24 +131,24 @@ public class CartridgeAgentEventListeners { } } - public void startTenantEventReceiver() { - - if (log.isDebugEnabled()) { - log.debug("Starting cartridge agent tenant event message receiver"); - } - - eventListenerExecutorService.submit(new Runnable() { - @Override - public void run() { - topologyEventReceiver.execute(); - } - }); - - if (log.isInfoEnabled()) { - log.info("Cartridge agent tenant receiver thread started, waiting for event messages ..."); - } - - } +// public void startTenantEventReceiver() { +// +// if (log.isDebugEnabled()) { +// log.debug("Starting cartridge agent tenant event message receiver"); +// } +// +// eventListenerExecutorService.submit(new Runnable() { +// @Override +// public void run() { +// topologyEventReceiver.execute(); +// } +// }); +// +// if (log.isInfoEnabled()) { +// log.info("Cartridge agent tenant receiver thread started, waiting for event messages ..."); +// } +// +// } // public void startApplicationsEventReceiver() { // http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index e7a2071..d2a8cb3 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -39,6 +39,7 @@ import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilte import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import java.util.concurrent.ExecutorService; @@ -123,8 +124,8 @@ public class LoadBalancerExtension { addTopologyEventListeners(topologyEventReceiver); // Add default topology provider event listeners topologyEventReceiver.addEventListeners(); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); +// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.execute(); if (log.isInfoEnabled()) { log.info("Topology receiver thread started"); } @@ -212,12 +213,12 @@ public class LoadBalancerExtension { * @param topologyEventReceiver topology event receiver instance */ private void addTopologyEventListeners(final LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) { - topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { + TopologyEventReceiver.getInstance().addEventListener(new CompleteTopologyEventListener() { @Override protected void onEvent(Event event) { try { - if (!loadBalancerStarted) { + if (!loadBalancerStarted) { configureAndStart(); } } catch (Exception e) { @@ -228,37 +229,37 @@ public class LoadBalancerExtension { } } }); - topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { + TopologyEventReceiver.getInstance().addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { reloadConfiguration(); } }); - topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() { + TopologyEventReceiver.getInstance().addEventListener(new MemberSuspendedEventListener() { @Override protected void onEvent(Event event) { reloadConfiguration(); } }); - topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + TopologyEventReceiver.getInstance().addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { reloadConfiguration(); } }); - topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { + TopologyEventReceiver.getInstance().addEventListener(new ClusterRemovedEventListener() { @Override protected void onEvent(Event event) { reloadConfiguration(); } }); - topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() { + TopologyEventReceiver.getInstance().addEventListener(new ServiceRemovedEventListener() { @Override protected void onEvent(Event event) { reloadConfiguration(); } }); - topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { + TopologyEventReceiver.getInstance().addEventListener(new MemberMaintenanceListener() { @Override protected void onEvent(Event event) { reloadConfiguration(); @@ -338,12 +339,12 @@ public class LoadBalancerExtension { * Stop load balancer instance. */ public void stop() { - try { - if (topologyEventReceiver != null) { - topologyEventReceiver.terminate(); - } - } catch (Exception ignore) { - } +// try { +// if (topologyEventReceiver != null) { +// topologyEventReceiver.terminate(); +// } +// } catch (Exception ignore) { +// } try { if (statisticsNotifier != null) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index cb74984..442686a 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -199,11 +199,11 @@ public class LoadBalancerServiceComponent { } topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Topology receiver thread started"); - } +// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.execute(); +// if (log.isInfoEnabled()) { +// log.info("Topology receiver thread started"); +// } if (log.isInfoEnabled()) { if (TopologyServiceFilter.getInstance().isActive()) { @@ -257,13 +257,13 @@ public class LoadBalancerServiceComponent { } // Terminate topology receiver - if (topologyEventReceiver != null) { - try { - topologyEventReceiver.terminate(); - } catch (Exception e) { - log.warn("An error occurred while terminating topology event receiver", e); - } - } +// if (topologyEventReceiver != null) { +// try { +// topologyEventReceiver.terminate(); +// } catch (Exception e) { +// log.warn("An error occurred while terminating topology event receiver", e); +// } +// } // Terminate application signup event receiver // if (applicationSignUpEventReceiver != null) { http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java index e516271..f16282d 100644 --- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/MetadataTopologyEventReceiver.java @@ -41,8 +41,9 @@ public class MetadataTopologyEventReceiver { private ExecutorService executorService; public MetadataTopologyEventReceiver() { - this.topologyEventReceiver = new TopologyEventReceiver(); - executorService = StratosThreadPool.getExecutorService(Constants.METADATA_SERVICE_THREAD_POOL_ID, 20); + this.topologyEventReceiver = TopologyEventReceiver.getInstance(); +// //executorService = StratosThreadPool.getExecutorService(Constants +// .METADATA_SERVICE_THREAD_POOL_ID, 20); addEventListeners(); } @@ -67,21 +68,21 @@ public class MetadataTopologyEventReceiver { }); } - public void execute() { - topologyEventReceiver.setExecutorService(getExecutorService()); - topologyEventReceiver.execute(); - - if (log.isInfoEnabled()) { - log.info("Metadata service topology receiver started."); - } - } - - public void terminate() { - topologyEventReceiver.terminate(); - if (log.isInfoEnabled()) { - log.info("Metadata service topology receiver stopped."); - } - } +// public void execute() { +// topologyEventReceiver.setExecutorService(getExecutorService()); +// topologyEventReceiver.execute(); +// +// if (log.isInfoEnabled()) { +// log.info("Metadata service topology receiver started."); +// } +// } +// +// public void terminate() { +// topologyEventReceiver.terminate(); +// if (log.isInfoEnabled()) { +// log.info("Metadata service topology receiver stopped."); +// } +// } public ExecutorService getExecutorService() { return executorService; http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java index 75ddbc7..47fc600 100644 --- a/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java +++ b/components/org.apache.stratos.metadata.service/src/main/java/org/apache/stratos/metadata/service/registry/MetadataApiRegistry.java @@ -56,7 +56,7 @@ public class MetadataApiRegistry implements DataStore { public MetadataApiRegistry() { metadataTopologyEventReceiver = new MetadataTopologyEventReceiver(); - metadataTopologyEventReceiver.execute(); +// metadataTopologyEventReceiver.execute(); metadataApplicationEventReceiver = new MetadataApplicationEventReceiver(); metadataApplicationEventReceiver.execute(); @@ -417,9 +417,9 @@ public class MetadataApiRegistry implements DataStore { return applicationIdToReadWriteLockMap; } - public void stopTopologyReceiver() { - metadataTopologyEventReceiver.terminate(); - } +// public void stopTopologyReceiver() { +// metadataTopologyEventReceiver.terminate(); +// } public void stopApplicationReceiver() { metadataApplicationEventReceiver.terminate(); http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java index 59c70c5..2696271 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; /** * CEP Topology Receiver for Fault Handling Window Processor. */ -public class CEPTopologyEventReceiver extends TopologyEventReceiver { +public class CEPTopologyEventReceiver { private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); private FaultHandlingWindowProcessor faultHandler; + private TopologyEventReceiver topologyEventReceiver; public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { this.faultHandler = faultHandler; + this.topologyEventReceiver = TopologyEventReceiver.getInstance(); addEventListeners(); } - @Override - public void execute() { - super.execute(); - log.info("CEP topology event receiver thread started"); - } +// @Override +// public void execute() { +// super.execute(); +// log.info("CEP topology event receiver thread started"); +// } private void addEventListeners() { // Load member time stamp map from the topology as a one time task - addEventListener(new CompleteTopologyEventListener() { + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { private boolean initialized; @Override @@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver { }); // Remove member from the time stamp map when MemberTerminated event is received. - addEventListener(new MemberTerminatedEventListener() { + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; @@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver { }); // Add member to time stamp map whenever member is activated - addEventListener(new MemberActivatedEventListener() { + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index 8d16b33..7aec0d5 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -286,10 +286,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run MemberFaultEventMap .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - executorService = StratosThreadPool - .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); - cepTopologyEventReceiver.setExecutorService(executorService); - cepTopologyEventReceiver.execute(); +// executorService = StratosThreadPool +// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); +// cepTopologyEventReceiver.setExecutorService(executorService); +// cepTopologyEventReceiver.execute(); //Ordinary scheduling window.schedule(); @@ -329,7 +329,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void destroy() { // terminate topology listener thread - cepTopologyEventReceiver.terminate(); +// cepTopologyEventReceiver.terminate(); window = null; // Shutdown executor service http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java index 59c70c5..2696271 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java @@ -34,26 +34,28 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; /** * CEP Topology Receiver for Fault Handling Window Processor. */ -public class CEPTopologyEventReceiver extends TopologyEventReceiver { +public class CEPTopologyEventReceiver { private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class); private FaultHandlingWindowProcessor faultHandler; + private TopologyEventReceiver topologyEventReceiver; public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) { this.faultHandler = faultHandler; + this.topologyEventReceiver = TopologyEventReceiver.getInstance(); addEventListeners(); } - @Override - public void execute() { - super.execute(); - log.info("CEP topology event receiver thread started"); - } +// @Override +// public void execute() { +// super.execute(); +// log.info("CEP topology event receiver thread started"); +// } private void addEventListeners() { // Load member time stamp map from the topology as a one time task - addEventListener(new CompleteTopologyEventListener() { + topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { private boolean initialized; @Override @@ -74,7 +76,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver { }); // Remove member from the time stamp map when MemberTerminated event is received. - addEventListener(new MemberTerminatedEventListener() { + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; @@ -84,7 +86,7 @@ public class CEPTopologyEventReceiver extends TopologyEventReceiver { }); // Add member to time stamp map whenever member is activated - addEventListener(new MemberActivatedEventListener() { + topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java index eb07dd9..2abfda1 100644 --- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java +++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java @@ -279,10 +279,10 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run MemberFaultEventMap .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap); - executorService = StratosThreadPool - .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); - cepTopologyEventReceiver.setExecutorService(executorService); - cepTopologyEventReceiver.execute(); +// executorService = StratosThreadPool +// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE); +// cepTopologyEventReceiver.setExecutorService(executorService); +// cepTopologyEventReceiver.execute(); //Ordinary scheduling window.schedule(); @@ -322,7 +322,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run @Override public void destroy() { // terminate topology listener thread - cepTopologyEventReceiver.terminate(); +// cepTopologyEventReceiver.terminate(); window = null; // Shutdown executor service http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java index d8cbc9f..c5751bd 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java @@ -125,9 +125,9 @@ public abstract class PythonAgentIntegrationTest { } ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize); - topologyEventReceiver = new TopologyEventReceiver(); - topologyEventReceiver.setExecutorService(executorService); - topologyEventReceiver.execute(); + topologyEventReceiver = TopologyEventReceiver.getInstance(); +// topologyEventReceiver.setExecutorService(executorService); +// topologyEventReceiver.execute(); instanceStatusEventReceiver = new InstanceStatusEventReceiver(); instanceStatusEventReceiver.setExecutorService(executorService); http://git-wip-us.apache.org/repos/asf/stratos/blob/b3dc5462/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java ---------------------------------------------------------------------- diff --git a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java index 3af9866..e506ef7 100644 --- a/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java +++ b/products/stratos/modules/integration/test-common/src/main/java/org/apache/stratos/integration/common/TopologyHandler.java @@ -108,9 +108,9 @@ public class TopologyHandler { } private void initializeApplicationSignUpEventReceiver() { - applicationSignUpEventReceiver = new ApplicationSignUpEventReceiver(); - applicationSignUpEventReceiver.setExecutorService(executorService); - applicationSignUpEventReceiver.execute(); + applicationSignUpEventReceiver = ApplicationSignUpEventReceiver.getInstance(); +// applicationSignUpEventReceiver.setExecutorService(executorService); +// applicationSignUpEventReceiver.execute(); } private void initializeTenantEventReceiver() { @@ -171,8 +171,8 @@ public class TopologyHandler { * Initialize Topology event receiver */ private void initializeTopologyEventReceiver() { - topologyEventReceiver = new TopologyEventReceiver(); - topologyEventReceiver.setExecutorService(executorService); + topologyEventReceiver = TopologyEventReceiver.getInstance(); +// topologyEventReceiver.setExecutorService(executorService); topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { @@ -206,7 +206,7 @@ public class TopologyHandler { clusterInstanceInactivateEvent.getClusterId())); } }); - topologyEventReceiver.execute(); + //topologyEventReceiver.execute(); } /**
