adding activator class for messaging and calling terminate of event recievers in de-activation of te bundle
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4c6442ac Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4c6442ac Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4c6442ac Branch: refs/heads/stratos-4.1.x Commit: 4c6442ac2072d731781ab4edc9e0c025941ba5d2 Parents: fce0421 Author: Isuru Haththotuwa <[email protected]> Authored: Thu Dec 17 23:17:57 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Thu Dec 17 23:17:57 2015 +0530 ---------------------------------------------------------------------- ...LoadBalancerCommonTopologyEventReceiver.java | 7 -- .../internal/LoadBalancerServiceComponent.java | 82 ++------------------ .../internal/MessagingServiceComponent.java | 66 ++++++++++++++++ .../status/ClusterStatusEventReceiver.java | 5 ++ .../mapping/DomainMappingEventReceiver.java | 5 ++ .../health/stat/HealthStatEventReceiver.java | 5 ++ 6 files changed, 89 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/4c6442ac/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java index 85142e3..93f391f 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java @@ -59,13 +59,6 @@ public class LoadBalancerCommonTopologyEventReceiver { } } -// public void execute() { -// super.execute(); -// if (log.isInfoEnabled()) { -// log.info("Load balancer topology receiver thread started"); -// } -// } - public void initializeTopology() { if (initialized) { return; http://git-wip-us.apache.org/repos/asf/stratos/blob/4c6442ac/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index 3786af8..b235208 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -25,7 +25,6 @@ import org.apache.axis2.engine.AxisConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.services.DistributedObjectProvider; -import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver; import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier; import org.apache.stratos.load.balancer.common.topology.TopologyProvider; @@ -38,7 +37,6 @@ import org.apache.stratos.load.balancer.event.receivers.LoadBalancerDomainMappin import org.apache.stratos.load.balancer.event.receivers.LoadBalancerTopologyEventReceiver; import org.apache.stratos.load.balancer.exception.TenantAwareLoadBalanceEndpointException; import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector; -import org.apache.stratos.load.balancer.util.LoadBalancerConstants; import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter; import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter; @@ -63,7 +61,6 @@ import org.wso2.carbon.utils.multitenancy.MultitenantConstants; import java.io.File; import java.util.Collection; -import java.util.concurrent.ExecutorService; /** * @scr.component name="org.apache.stratos.load.balancer.internal.LoadBalancerServiceComponent" immediate="true" @@ -90,7 +87,6 @@ public class LoadBalancerServiceComponent { private static final Log log = LogFactory.getLog(LoadBalancerServiceComponent.class); private boolean activated = false; - private ExecutorService executorService; private LoadBalancerTopologyEventReceiver topologyEventReceiver; private TenantEventReceiver tenantEventReceiver; private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver; @@ -124,11 +120,6 @@ public class LoadBalancerServiceComponent { // Configure topology filters TopologyFilterConfigurator.configure(configuration); - int threadPoolSize = Integer.getInteger(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_SIZE_KEY, - LoadBalancerConstants.LOAD_BALANCER_DEFAULT_THREAD_POOL_SIZE); - executorService = StratosThreadPool.getExecutorService(LoadBalancerConstants.LOAD_BALANCER_THREAD_POOL_ID, - threadPoolSize); - TopologyProvider topologyProvider = LoadBalancerConfiguration.getInstance().getTopologyProvider(); if (topologyProvider == null) { topologyProvider = new TopologyProvider(); @@ -137,18 +128,18 @@ public class LoadBalancerServiceComponent { if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) { // Start tenant & application signup event receivers - startTenantEventReceiver(executorService); - startApplicationSignUpEventReceiver(executorService, topologyProvider); + startTenantEventReceiver(); + startApplicationSignUpEventReceiver(topologyProvider); } if (configuration.isDomainMappingEnabled()) { // Start domain mapping event receiver - startDomainMappingEventReceiver(executorService, topologyProvider); + startDomainMappingEventReceiver(topologyProvider); } if (configuration.isTopologyEventListenerEnabled()) { // Start topology receiver - startTopologyEventReceiver(executorService, topologyProvider); + startTopologyEventReceiver(topologyProvider); } if (configuration.isCepStatsPublisherEnabled()) { @@ -167,43 +158,28 @@ public class LoadBalancerServiceComponent { } } - private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startDomainMappingEventReceiver(TopologyProvider topologyProvider) { if (domainMappingEventReceiver != null) { return; } domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider); -// domainMappingEventReceiver.setExecutorService(executorService); -// domainMappingEventReceiver.execute(); -// if (log.isInfoEnabled()) { -// log.info("Domain mapping event receiver thread started"); -// } } - private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startApplicationSignUpEventReceiver(TopologyProvider topologyProvider) { if (applicationSignUpEventReceiver != null) { return; } applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider); -// applicationSignUpEventReceiver.setExecutorService(executorService); -// applicationSignUpEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Application signup event receiver thread started"); - } } - private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) { + private void startTopologyEventReceiver(TopologyProvider topologyProvider) { if (topologyEventReceiver != null) { return; } topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider); -// topologyEventReceiver.setExecutorService(executorService); -// topologyEventReceiver.execute(); -// if (log.isInfoEnabled()) { -// log.info("Topology receiver thread started"); -// } if (log.isInfoEnabled()) { if (TopologyServiceFilter.getInstance().isActive()) { @@ -223,14 +199,8 @@ public class LoadBalancerServiceComponent { } } - private void startTenantEventReceiver(ExecutorService executorService) { - + private void startTenantEventReceiver() { tenantEventReceiver = TenantEventReceiver.getInstance(); -// tenantEventReceiver.setExecutorService(executorService); -// tenantEventReceiver.execute(); - if (log.isInfoEnabled()) { - log.info("Tenant event receiver thread started"); - } } private void startStatisticsNotifier(TopologyProvider topologyProvider) { @@ -256,33 +226,6 @@ public class LoadBalancerServiceComponent { log.warn("An error occurred while removing endpoint deployer", e); } - // Terminate topology receiver -// if (topologyEventReceiver != null) { -// try { -// topologyEventReceiver.terminate(); -// } catch (Exception e) { -// log.warn("An error occurred while terminating topology event receiver", e); -// } -// } - - // Terminate application signup event receiver -// if (applicationSignUpEventReceiver != null) { -// try { -// applicationSignUpEventReceiver.terminate(); -// } catch (Exception e) { -// log.warn("An error occurred while terminating application signup event receiver", e); -// } -// } - - // Terminate domain mapping event receiver -// if (domainMappingEventReceiver != null) { -// try { -// domainMappingEventReceiver.terminate(); -// } catch (Exception e) { -// log.warn("An error occurred while terminating domain mapping event receiver", e); -// } -// } - // Terminate statistics notifier if (statisticsNotifier != null) { try { @@ -291,15 +234,6 @@ public class LoadBalancerServiceComponent { log.warn("An error occurred while terminating health statistics notifier", e); } } - - // Shutdown executor service - if (executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down load balancer executor service", e); - } - } } /** http://git-wip-us.apache.org/repos/asf/stratos/blob/4c6442ac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java new file mode 100644 index 0000000..c97125b --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/internal/MessagingServiceComponent.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.internal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.message.receiver.application.ApplicationsEventReceiver; +import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver; +import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver; +import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver; +import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver; +import org.apache.stratos.messaging.message.receiver.initializer.InitializerEventReceiver; +import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver; +import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; +import org.osgi.service.component.ComponentContext; + +/** + * @scr.component name="org.apache.stratos.messaging.internal.MessagingServiceComponent" + * immediate="true" + */ +public class MessagingServiceComponent { + + private static final Log log = LogFactory.getLog(MessagingServiceComponent.class); + + protected void activate(ComponentContext context) { + try { + log.info("Messaging Service bundle activated"); + } catch (Exception e) { + log.error("Could not activate Messaging Service component", e); + } + } + + protected void deactivate(ComponentContext context) { + // deactivate all message receivers + try { + ApplicationSignUpEventReceiver.getInstance().terminate(); + ApplicationsEventReceiver.getInstance().terminate(); + ClusterStatusEventReceiver.getInstance().terminate(); + DomainMappingEventReceiver.getInstance().terminate(); + HealthStatEventReceiver.getInstance().terminate(); + InitializerEventReceiver.getInstance().terminate(); + TenantEventReceiver.getInstance().terminate(); + TopologyEventReceiver.getInstance().terminate(); + log.info("Messaging Service component is deactivated"); + } catch (Exception e) { + log.error("Could not de-activate Messaging Service component", e); + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/4c6442ac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java index e191799..be42b43 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/cluster/status/ClusterStatusEventReceiver.java @@ -87,6 +87,11 @@ public class ClusterStatusEventReceiver extends StratosEventReceiver { } } + public void terminate() { + eventSubscriber.terminate(); + messageDelegator.terminate(); + } + public boolean isSubscribed() { return ((eventSubscriber != null) && (eventSubscriber.isSubscribed())); } http://git-wip-us.apache.org/repos/asf/stratos/blob/4c6442ac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java index 4e4c04b..6de99c0 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/domain/mapping/DomainMappingEventReceiver.java @@ -64,6 +64,11 @@ public class DomainMappingEventReceiver extends StratosEventReceiver { return instance; } + public void terminate() { + eventSubscriber.terminate(); + messageDelegator.terminate(); + } + private void execute() { try { // Start topic subscriber thread http://git-wip-us.apache.org/repos/asf/stratos/blob/4c6442ac/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java index ba124a7..a9d2602 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventReceiver.java @@ -82,4 +82,9 @@ public class HealthStatEventReceiver extends StratosEventReceiver { } } } + + public void terminate() { + eventSubscriber.terminate(); + messageDelegator.terminate(); + } }
