Repository: stratos
Updated Branches:
  refs/heads/master 5c502b266 -> de00e1fa3


Updating load balancer statistics reader to user topology provider


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/de00e1fa
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/de00e1fa
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/de00e1fa

Branch: refs/heads/master
Commit: de00e1fa3b76505a469f6536c8d0fac2bfe57b87
Parents: 5c502b2
Author: Imesh Gunaratne <[email protected]>
Authored: Sun Mar 8 10:08:23 2015 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sun Mar 8 10:08:45 2015 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        | 12 +----
 .../load/balancer/common/domain/Service.java    |  8 ++-
 ...LoadBalancerCommonTopologyEventReceiver.java | 14 +++--
 .../LoadBalancerStatisticsReader.java           |  2 +-
 .../LoadBalancerStatisticsNotifier.java         | 56 +++++++++-----------
 .../extension/api/LoadBalancerExtension.java    | 46 ++++++++++------
 .../internal/LoadBalancerServiceComponent.java  |  7 +--
 .../LoadBalancerStatisticsCollector.java        |  3 +-
 .../stratos/haproxy/extension/HAProxy.java      |  3 +-
 .../extension/HAProxyStatisticsReader.java      | 49 ++++++++---------
 .../apache/stratos/haproxy/extension/Main.java  | 29 ++++++++--
 11 files changed, 130 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 31eca08..d351134 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -118,7 +118,7 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                handleNullObject(cartridgeConfig, "Cartridge definition is 
null");
 
                if (log.isInfoEnabled()) {
-                       log.info("Adding cartridge: [cartridge-type] " + 
cartridgeConfig.getType());
+                       log.info("Updating cartridge: [cartridge-type] " + 
cartridgeConfig.getType());
                }
                if (log.isDebugEnabled()) {
                        log.debug("Cartridge definition: " + 
cartridgeConfig.toString());
@@ -133,12 +133,9 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                        throw new InvalidCartridgeDefinitionException(msg, e);
                }
 
-               // TODO transaction begins
                String cartridgeType = cartridge.getType();
-               // Undeploy if already deployed
                if (cloudControllerContext.getCartridge(cartridgeType) != null) 
{
                        Cartridge cartridgeToBeRemoved = 
cloudControllerContext.getCartridge(cartridgeType);
-                       // undeploy
                        try {
                                removeCartridge(cartridgeToBeRemoved.getType());
                        } catch (InvalidCartridgeTypeException ignore) {
@@ -152,15 +149,10 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                // Add cartridge to the cloud controller context and persist
                CloudControllerContext.getInstance().addCartridge(cartridge);
                CloudControllerContext.getInstance().persist();
-
-               List<Cartridge> cartridgeList = new ArrayList<Cartridge>();
-               cartridgeList.add(cartridge);
-
-               TopologyBuilder.handleServiceCreated(cartridgeList);
                // transaction ends
 
                if (log.isInfoEnabled()) {
-                       log.info("Successfully added cartridge: 
[cartridge-type] " + cartridgeType);
+                       log.info("Successfully updated cartridge: 
[cartridge-type] " + cartridgeType);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
index 116078e..f94e7f8 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
@@ -89,12 +89,16 @@ public class Service {
 
     public void addPorts(Collection<Port> ports) {
         for(Port port : ports) {
-            addPort(port);
+            if(!portExists(port)) {
+                addPort(port);
+            }
         }
     }
 
     public void removePort(Port port) {
-        this.portMap.remove(port.getProxy());
+        if(portExists(port)) {
+            this.portMap.remove(port.getProxy());
+        }
     }
 
     public boolean portExists(Port port) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 8582f3c..88be323 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -293,7 +293,16 @@ public class LoadBalancerCommonTopologyEventReceiver 
extends TopologyEventReceiv
             }
             return;
         }
-        topologyProvider.addMember(transformMember(member));
+
+        org.apache.stratos.load.balancer.common.domain.Member lbMember = 
transformMember(member);
+        org.apache.stratos.load.balancer.common.domain.Service lbService = 
topologyProvider.getTopology().
+                getService(serviceName);
+        if(lbService == null) {
+            log.warn(String.format("Service not found: %s", serviceName));
+            return;
+        }
+        lbService.addPorts(lbMember.getPorts());
+        topologyProvider.addMember(lbMember);
     }
 
     /**
@@ -347,9 +356,6 @@ public class LoadBalancerCommonTopologyEventReceiver 
extends TopologyEventReceiv
     private org.apache.stratos.load.balancer.common.domain.Service 
transformService(Service messagingService) {
         org.apache.stratos.load.balancer.common.domain.Service service =
                 new 
org.apache.stratos.load.balancer.common.domain.Service(messagingService.getServiceName());
-        for(Port port : messagingService.getPorts()) {
-            service.addPort(transformPort(port));
-        }
         return service;
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
index c0016fe..c9d2556 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -19,7 +19,7 @@
 
 package org.apache.stratos.load.balancer.common.statistics;
 
-import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
 
 /**
  * Load balancer statistics reader interface.

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index 9c4cb88..52f98be 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -25,10 +25,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
 import 
org.apache.stratos.common.statistics.publisher.InFlightRequestPublisherFactory;
 import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Service;
 import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 
 /**
  * Load balancer statistics notifier thread for publishing statistics 
periodically to CEP.
@@ -37,13 +37,15 @@ public class LoadBalancerStatisticsNotifier implements 
Runnable {
     private static final Log log = 
LogFactory.getLog(LoadBalancerStatisticsNotifier.class);
 
     private final LoadBalancerStatisticsReader statsReader;
+    private final TopologyProvider topologyProvider;
     private final InFlightRequestPublisher inFlightRequestPublisher;
     private long statsPublisherInterval = 15000;
     private String networkPartitionId;
     private boolean terminated;
 
-    public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader 
statsReader) {
+    public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader 
statsReader, TopologyProvider topologyProvider) {
         this.statsReader = statsReader;
+        this.topologyProvider = topologyProvider;
         this.inFlightRequestPublisher = 
InFlightRequestPublisherFactory.createInFlightRequestPublisher(
                 StatisticsPublisherType.WSO2CEP);
 
@@ -74,36 +76,26 @@ public class LoadBalancerStatisticsNotifier implements 
Runnable {
                     log.debug("Publishing load balancer statistics");
                 }
                 if (inFlightRequestPublisher.isEnabled()) {
-                    try {
-                        TopologyManager.acquireReadLock();
-                        int requestCount;
-                        int servedRequestCount;
-                        int activeInstancesCount;
-                        for (Service service : 
TopologyManager.getTopology().getServices()) {
-                            for (Cluster cluster : service.getClusters()) {
-                                if (!cluster.isLbCluster()) {
-                                    // Publish in-flight request count of load 
balancer's network partition
-                                    requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId());
-                                    servedRequestCount = 
statsReader.getServedRequestCount(cluster.getClusterId());
-                                    if(requestCount == 0) {
-                                        servedRequestCount = 0;
-                                    }
-                                    activeInstancesCount = 
statsReader.getActiveInstancesCount(cluster);
-                                    
inFlightRequestPublisher.publish(cluster.getClusterId(), 
networkPartitionId,activeInstancesCount, requestCount, servedRequestCount);
-                                    log.info(String.format("In-flight request 
count published to cep: [cluster-id] %s [network-partition] %s [value] %d 
[active instances] %d [RIF] %d ",
-                                            cluster.getClusterId(), 
networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount ));
-                                    if (log.isDebugEnabled()) {
-                                        log.debug(String.format("In-flight 
request count published to cep: [cluster-id] %s [network-partition] %s [value] 
%d",
-                                                cluster.getClusterId(), 
networkPartitionId, requestCount));
-                                    }
-                                }
-                                else {
-                                    // Load balancer cluster found in 
topology; we do not need to publish request counts for them.
-                                }
+                    int requestCount;
+                    int servedRequestCount;
+                    int activeInstancesCount;
+                    for (Service service : 
topologyProvider.getTopology().getServices()) {
+                        for (Cluster cluster : service.getClusters()) {
+                            // Publish in-flight request count of load 
balancer's network partition
+                            requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId());
+                            servedRequestCount = 
statsReader.getServedRequestCount(cluster.getClusterId());
+                            if (requestCount == 0) {
+                                servedRequestCount = 0;
+                            }
+                            activeInstancesCount = 
statsReader.getActiveInstancesCount(cluster);
+                            
inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, 
activeInstancesCount, requestCount, servedRequestCount);
+                            log.info(String.format("In-flight request count 
published to cep: [cluster-id] %s [network-partition] %s [value] %d [active 
instances] %d [RIF] %d ",
+                                    cluster.getClusterId(), 
networkPartitionId, servedRequestCount, activeInstancesCount, requestCount));
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("In-flight request 
count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
+                                        cluster.getClusterId(), 
networkPartitionId, requestCount));
                             }
                         }
-                    } finally {
-                        TopologyManager.releaseReadLock();
                     }
                 } else if (log.isWarnEnabled()) {
                     log.warn("In-flight request count publisher is disabled");

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 2459b31..ef2dc5b 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -62,10 +62,12 @@ public class LoadBalancerExtension {
         * @param loadBalancer Load balancer instance: Mandatory.
         * @param statsReader  Statistics reader: If null statistics notifier 
thread will not be started.
         */
-       public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatisticsReader statsReader) {
-               this.loadBalancer = loadBalancer;
+       public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatisticsReader statsReader,
+                                 TopologyProvider topologyProvider) {
+
+        this.loadBalancer = loadBalancer;
                this.statsReader = statsReader;
-        this.topologyProvider = new TopologyProvider();
+        this.topologyProvider = topologyProvider;
        }
 
 
@@ -82,7 +84,7 @@ public class LoadBalancerExtension {
 
                        if (statsReader != null) {
                                // Start stats notifier thread
-                               statisticsNotifier = new 
LoadBalancerStatisticsNotifier(statsReader);
+                               statisticsNotifier = new 
LoadBalancerStatisticsNotifier(statsReader, topologyProvider);
                                Thread statsNotifierThread = new 
Thread(statisticsNotifier);
                                statsNotifierThread.start();
                        } else {
@@ -158,7 +160,7 @@ public class LoadBalancerExtension {
 
                         // Configure load balancer
                         Topology topology = topologyProvider.getTopology();
-                        if(topologyInitialized(topology) && 
loadBalancer.configure(topology)) {
+                        if(topologyPopulated(topology) && 
loadBalancer.configure(topology)) {
                             // Start load balancer
                             loadBalancer.start();
                             loadBalancerStarted = true;
@@ -168,7 +170,7 @@ public class LoadBalancerExtension {
                     if (log.isErrorEnabled()) {
                         log.error("Could not start load balancer", e);
                     }
-                    terminate();
+                    stop();
                 }
             }
         });
@@ -205,11 +207,11 @@ public class LoadBalancerExtension {
        }
 
     /**
-     * Returns true if topology has initialized
+     * Returns true if topology has populated
      * @param topology
      * @return
      */
-    private boolean topologyInitialized(Topology topology) {
+    private boolean topologyPopulated(Topology topology) {
         for(Service service : topology.getServices()) {
             for(Cluster cluster : service.getClusters()) {
                 if(cluster.getMembers().size() > 0) {
@@ -240,15 +242,27 @@ public class LoadBalancerExtension {
        }
 
     /**
-     * Terminate event receivers and publishers.
+     * Stop load balancer instance.
      */
-       public void terminate() {
-               if (topologyEventReceiver != null) {
-                       topologyEventReceiver.terminate();
-               }
-               if (statisticsNotifier != null) {
-                       statisticsNotifier.terminate();
-               }
+       public void stop() {
+        try {
+            if (topologyEventReceiver != null) {
+                topologyEventReceiver.terminate();
+            }
+        } catch (Exception ignore) {
+        }
+
+        try {
+            if (statisticsNotifier != null) {
+                statisticsNotifier.terminate();
+            }
+        } catch (Exception ignore) {
+        }
+
+        try {
+            loadBalancer.stop();
+        } catch (Exception ignore) {
+        }
        }
 
        public ExecutorService getExecutorService() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index fed6e84..33735dc 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -147,7 +147,7 @@ public class LoadBalancerServiceComponent {
 
             if(configuration.isCepStatsPublisherEnabled()) {
                 // Start statistics notifier
-                startStatisticsNotifier();
+                startStatisticsNotifier(topologyProvider);
             }
 
             activated = true;
@@ -217,9 +217,10 @@ public class LoadBalancerServiceComponent {
         }
     }
 
-    private void startStatisticsNotifier() {
+    private void startStatisticsNotifier(TopologyProvider topologyProvider) {
         // Start stats notifier thread
-        statisticsNotifier = new 
LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance());
+        statisticsNotifier = new 
LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance(),
+                topologyProvider);
         Thread statsNotifierThread = new Thread(statisticsNotifier);
         statsNotifierThread.start();
         if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
index a5239b4..2e5723f 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -21,6 +21,7 @@ package org.apache.stratos.load.balancer.statistics;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
 import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 
 import java.util.Map;
@@ -93,7 +94,7 @@ public class LoadBalancerStatisticsCollector implements 
LoadBalancerStatisticsRe
         }
     }
 
-    public int 
getActiveInstancesCount(org.apache.stratos.messaging.domain.topology.Cluster 
cluster) {
+    public int getActiveInstancesCount(Cluster cluster) {
         return cluster.getMembers().size();
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
index 8c0fe6a..18f005d 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
@@ -151,9 +151,8 @@ public class HAProxy implements LoadBalancer {
             }
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
-                log.error("Could not stop haproxy instance");
+                log.error("Could not stop haproxy instance", e);
             }
-            throw new LoadBalancerExtensionException(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
index 5dec4d3..3c2cf36 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -22,12 +22,12 @@ package org.apache.stratos.haproxy.extension;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.util.CommandUtils;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Member;
+import org.apache.stratos.load.balancer.common.domain.Port;
+import org.apache.stratos.load.balancer.common.domain.Service;
 import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 
 import java.io.IOException;
 
@@ -35,14 +35,17 @@ import java.io.IOException;
  * HAProxy statistics reader.
  */
 public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
+
     private static final Log log = 
LogFactory.getLog(HAProxyStatisticsReader.class);
 
     private String scriptsPath;
     private String statsSocketFilePath;
+    private TopologyProvider topologyProvider;
 
-    public HAProxyStatisticsReader() {
+    public HAProxyStatisticsReader(TopologyProvider topologyProvider) {
         this.scriptsPath = HAProxyContext.getInstance().getScriptsPath();
         this.statsSocketFilePath = 
HAProxyContext.getInstance().getStatsSocketFilePath();
+        this.topologyProvider = topologyProvider;
     }
 
     @Override
@@ -51,7 +54,7 @@ public class HAProxyStatisticsReader implements 
LoadBalancerStatisticsReader {
         String[] array;
         int totalWeight, weight;
 
-        for (Service service : TopologyManager.getTopology().getServices()) {
+        for (Service service : topologyProvider.getTopology().getServices()) {
             for (Cluster cluster : service.getClusters()) {
                 if (cluster.getClusterId().equals(clusterId)) {
                     totalWeight = 0;
@@ -63,25 +66,23 @@ public class HAProxyStatisticsReader implements 
LoadBalancerStatisticsReader {
                         for(String hostname : cluster.getHostNames()) {
                             backendId = hostname+"-http-members";
                             for (Member member : cluster.getMembers()) {
-                                
if(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId()))
 {
-                                    // echo "get weight <backend>/<server>" | 
socat stdio <stats-socket>
-                                    command = String.format("%s/get-weight.sh 
%s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
-                                    try {
-                                        output = 
CommandUtils.executeCommand(command);
-                                        if ((output != null) && 
(output.length() > 0)) {
-                                            array = output.split(" ");
-                                            if ((array != null) && 
(array.length > 0)) {
-                                                weight = 
Integer.parseInt(array[0]);
-                                                if (log.isDebugEnabled()) {
-                                                    
log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] 
%d", member.getClusterId(), member.getMemberId(), weight));
-                                                }
-                                                totalWeight += weight;
+                                // echo "get weight <backend>/<server>" | 
socat stdio <stats-socket>
+                                command = String.format("%s/get-weight.sh %s 
%s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+                                try {
+                                    output = 
CommandUtils.executeCommand(command);
+                                    if ((output != null) && (output.length() > 
0)) {
+                                        array = output.split(" ");
+                                        if ((array != null) && (array.length > 
0)) {
+                                            weight = 
Integer.parseInt(array[0]);
+                                            if (log.isDebugEnabled()) {
+                                                
log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] 
%d", member.getClusterId(), member.getMemberId(), weight));
                                             }
+                                            totalWeight += weight;
                                         }
-                                    } catch (IOException e) {
-                                        if (log.isErrorEnabled()) {
-                                            log.error(e);
-                                        }
+                                    }
+                                } catch (IOException e) {
+                                    if (log.isErrorEnabled()) {
+                                        log.error(e);
                                     }
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index c6ee22b..f56541d 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension;
 
 import java.util.concurrent.ExecutorService;
@@ -44,12 +45,31 @@ public class Main {
                        if (log.isInfoEnabled()) {
                                log.info("HAProxy extension started");
                        }
+
+            // Add shutdown hook
+            final Thread mainThread = Thread.currentThread();
+            final LoadBalancerExtension finalExtension = extension;
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    try {
+                        if(finalExtension != null) {
+                            log.info("Shutting haproxy instance...");
+                            finalExtension.stop();
+                        }
+                        mainThread.join();
+                    } catch (Exception e) {
+                        log.error(e);
+                    }
+                }
+            });
+
                        executorService = 
StratosThreadPool.getExecutorService("haproxy.extension.thread.pool", 10);
                        // Validate runtime parameters
                        HAProxyContext.getInstance().validate();
-                       extension = new LoadBalancerExtension(new HAProxy(),
-                                                             
(HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
-                                                              new 
HAProxyStatisticsReader() : null));
+            TopologyProvider topologyProvider = new TopologyProvider();
+            HAProxyStatisticsReader statisticsReader = 
HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+                    new HAProxyStatisticsReader(topologyProvider) : null;
+            extension = new LoadBalancerExtension(new HAProxy(), 
statisticsReader, topologyProvider);
                        extension.setExecutorService(executorService);
                        extension.execute();
                } catch (Exception e) {
@@ -57,7 +77,8 @@ public class Main {
                                log.error(e);
                        }
                        if (extension != null) {
-                               extension.terminate();
+                log.info("Shutting haproxy instance...");
+                               extension.stop();
                        }
                }
        }

Reply via email to