Repository: stratos Updated Branches: refs/heads/master 5c502b266 -> de00e1fa3
Updating load balancer statistics reader to user topology provider Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/de00e1fa Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/de00e1fa Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/de00e1fa Branch: refs/heads/master Commit: de00e1fa3b76505a469f6536c8d0fac2bfe57b87 Parents: 5c502b2 Author: Imesh Gunaratne <[email protected]> Authored: Sun Mar 8 10:08:23 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Mar 8 10:08:45 2015 +0530 ---------------------------------------------------------------------- .../impl/CloudControllerServiceImpl.java | 12 +---- .../load/balancer/common/domain/Service.java | 8 ++- ...LoadBalancerCommonTopologyEventReceiver.java | 14 +++-- .../LoadBalancerStatisticsReader.java | 2 +- .../LoadBalancerStatisticsNotifier.java | 56 +++++++++----------- .../extension/api/LoadBalancerExtension.java | 46 ++++++++++------ .../internal/LoadBalancerServiceComponent.java | 7 +-- .../LoadBalancerStatisticsCollector.java | 3 +- .../stratos/haproxy/extension/HAProxy.java | 3 +- .../extension/HAProxyStatisticsReader.java | 49 ++++++++--------- .../apache/stratos/haproxy/extension/Main.java | 29 ++++++++-- 11 files changed, 130 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 31eca08..d351134 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -118,7 +118,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(cartridgeConfig, "Cartridge definition is null"); if (log.isInfoEnabled()) { - log.info("Adding cartridge: [cartridge-type] " + cartridgeConfig.getType()); + log.info("Updating cartridge: [cartridge-type] " + cartridgeConfig.getType()); } if (log.isDebugEnabled()) { log.debug("Cartridge definition: " + cartridgeConfig.toString()); @@ -133,12 +133,9 @@ public class CloudControllerServiceImpl implements CloudControllerService { throw new InvalidCartridgeDefinitionException(msg, e); } - // TODO transaction begins String cartridgeType = cartridge.getType(); - // Undeploy if already deployed if (cloudControllerContext.getCartridge(cartridgeType) != null) { Cartridge cartridgeToBeRemoved = cloudControllerContext.getCartridge(cartridgeType); - // undeploy try { removeCartridge(cartridgeToBeRemoved.getType()); } catch (InvalidCartridgeTypeException ignore) { @@ -152,15 +149,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { // Add cartridge to the cloud controller context and persist CloudControllerContext.getInstance().addCartridge(cartridge); CloudControllerContext.getInstance().persist(); - - List<Cartridge> cartridgeList = new ArrayList<Cartridge>(); - cartridgeList.add(cartridge); - - TopologyBuilder.handleServiceCreated(cartridgeList); // transaction ends if (log.isInfoEnabled()) { - log.info("Successfully added cartridge: [cartridge-type] " + cartridgeType); + log.info("Successfully updated cartridge: [cartridge-type] " + cartridgeType); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java index 116078e..f94e7f8 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java @@ -89,12 +89,16 @@ public class Service { public void addPorts(Collection<Port> ports) { for(Port port : ports) { - addPort(port); + if(!portExists(port)) { + addPort(port); + } } } public void removePort(Port port) { - this.portMap.remove(port.getProxy()); + if(portExists(port)) { + this.portMap.remove(port.getProxy()); + } } public boolean portExists(Port port) { http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java index 8582f3c..88be323 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java @@ -293,7 +293,16 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv } return; } - topologyProvider.addMember(transformMember(member)); + + org.apache.stratos.load.balancer.common.domain.Member lbMember = transformMember(member); + org.apache.stratos.load.balancer.common.domain.Service lbService = topologyProvider.getTopology(). + getService(serviceName); + if(lbService == null) { + log.warn(String.format("Service not found: %s", serviceName)); + return; + } + lbService.addPorts(lbMember.getPorts()); + topologyProvider.addMember(lbMember); } /** @@ -347,9 +356,6 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv private org.apache.stratos.load.balancer.common.domain.Service transformService(Service messagingService) { org.apache.stratos.load.balancer.common.domain.Service service = new org.apache.stratos.load.balancer.common.domain.Service(messagingService.getServiceName()); - for(Port port : messagingService.getPorts()) { - service.addPort(transformPort(port)); - } return service; } http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java index c0016fe..c9d2556 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -19,7 +19,7 @@ package org.apache.stratos.load.balancer.common.statistics; -import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.load.balancer.common.domain.Cluster; /** * Load balancer statistics reader interface. http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index 9c4cb88..52f98be 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -25,10 +25,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher; import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisherFactory; import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; +import org.apache.stratos.load.balancer.common.domain.Cluster; +import org.apache.stratos.load.balancer.common.domain.Service; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.load.balancer.common.topology.TopologyProvider; /** * Load balancer statistics notifier thread for publishing statistics periodically to CEP. @@ -37,13 +37,15 @@ public class LoadBalancerStatisticsNotifier implements Runnable { private static final Log log = LogFactory.getLog(LoadBalancerStatisticsNotifier.class); private final LoadBalancerStatisticsReader statsReader; + private final TopologyProvider topologyProvider; private final InFlightRequestPublisher inFlightRequestPublisher; private long statsPublisherInterval = 15000; private String networkPartitionId; private boolean terminated; - public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader) { + public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader, TopologyProvider topologyProvider) { this.statsReader = statsReader; + this.topologyProvider = topologyProvider; this.inFlightRequestPublisher = InFlightRequestPublisherFactory.createInFlightRequestPublisher( StatisticsPublisherType.WSO2CEP); @@ -74,36 +76,26 @@ public class LoadBalancerStatisticsNotifier implements Runnable { log.debug("Publishing load balancer statistics"); } if (inFlightRequestPublisher.isEnabled()) { - try { - TopologyManager.acquireReadLock(); - int requestCount; - int servedRequestCount; - int activeInstancesCount; - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - if (!cluster.isLbCluster()) { - // Publish in-flight request count of load balancer's network partition - requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId()); - if(requestCount == 0) { - servedRequestCount = 0; - } - activeInstancesCount = statsReader.getActiveInstancesCount(cluster); - inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId,activeInstancesCount, requestCount, servedRequestCount); - log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ", - cluster.getClusterId(), networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount )); - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", - cluster.getClusterId(), networkPartitionId, requestCount)); - } - } - else { - // Load balancer cluster found in topology; we do not need to publish request counts for them. - } + int requestCount; + int servedRequestCount; + int activeInstancesCount; + for (Service service : topologyProvider.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + // Publish in-flight request count of load balancer's network partition + requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); + servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId()); + if (requestCount == 0) { + servedRequestCount = 0; + } + activeInstancesCount = statsReader.getActiveInstancesCount(cluster); + inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, activeInstancesCount, requestCount, servedRequestCount); + log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ", + cluster.getClusterId(), networkPartitionId, servedRequestCount, activeInstancesCount, requestCount)); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", + cluster.getClusterId(), networkPartitionId, requestCount)); } } - } finally { - TopologyManager.releaseReadLock(); } } else if (log.isWarnEnabled()) { log.warn("In-flight request count publisher is disabled"); http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java index 2459b31..ef2dc5b 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java @@ -62,10 +62,12 @@ public class LoadBalancerExtension { * @param loadBalancer Load balancer instance: Mandatory. * @param statsReader Statistics reader: If null statistics notifier thread will not be started. */ - public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) { - this.loadBalancer = loadBalancer; + public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader, + TopologyProvider topologyProvider) { + + this.loadBalancer = loadBalancer; this.statsReader = statsReader; - this.topologyProvider = new TopologyProvider(); + this.topologyProvider = topologyProvider; } @@ -82,7 +84,7 @@ public class LoadBalancerExtension { if (statsReader != null) { // Start stats notifier thread - statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader); + statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader, topologyProvider); Thread statsNotifierThread = new Thread(statisticsNotifier); statsNotifierThread.start(); } else { @@ -158,7 +160,7 @@ public class LoadBalancerExtension { // Configure load balancer Topology topology = topologyProvider.getTopology(); - if(topologyInitialized(topology) && loadBalancer.configure(topology)) { + if(topologyPopulated(topology) && loadBalancer.configure(topology)) { // Start load balancer loadBalancer.start(); loadBalancerStarted = true; @@ -168,7 +170,7 @@ public class LoadBalancerExtension { if (log.isErrorEnabled()) { log.error("Could not start load balancer", e); } - terminate(); + stop(); } } }); @@ -205,11 +207,11 @@ public class LoadBalancerExtension { } /** - * Returns true if topology has initialized + * Returns true if topology has populated * @param topology * @return */ - private boolean topologyInitialized(Topology topology) { + private boolean topologyPopulated(Topology topology) { for(Service service : topology.getServices()) { for(Cluster cluster : service.getClusters()) { if(cluster.getMembers().size() > 0) { @@ -240,15 +242,27 @@ public class LoadBalancerExtension { } /** - * Terminate event receivers and publishers. + * Stop load balancer instance. */ - public void terminate() { - if (topologyEventReceiver != null) { - topologyEventReceiver.terminate(); - } - if (statisticsNotifier != null) { - statisticsNotifier.terminate(); - } + public void stop() { + try { + if (topologyEventReceiver != null) { + topologyEventReceiver.terminate(); + } + } catch (Exception ignore) { + } + + try { + if (statisticsNotifier != null) { + statisticsNotifier.terminate(); + } + } catch (Exception ignore) { + } + + try { + loadBalancer.stop(); + } catch (Exception ignore) { + } } public ExecutorService getExecutorService() { http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java index fed6e84..33735dc 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java @@ -147,7 +147,7 @@ public class LoadBalancerServiceComponent { if(configuration.isCepStatsPublisherEnabled()) { // Start statistics notifier - startStatisticsNotifier(); + startStatisticsNotifier(topologyProvider); } activated = true; @@ -217,9 +217,10 @@ public class LoadBalancerServiceComponent { } } - private void startStatisticsNotifier() { + private void startStatisticsNotifier(TopologyProvider topologyProvider) { // Start stats notifier thread - statisticsNotifier = new LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance()); + statisticsNotifier = new LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance(), + topologyProvider); Thread statsNotifierThread = new Thread(statisticsNotifier); statsNotifierThread.start(); if (log.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index a5239b4..2e5723f 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -21,6 +21,7 @@ package org.apache.stratos.load.balancer.statistics; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.load.balancer.common.domain.Cluster; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; import java.util.Map; @@ -93,7 +94,7 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe } } - public int getActiveInstancesCount(org.apache.stratos.messaging.domain.topology.Cluster cluster) { + public int getActiveInstancesCount(Cluster cluster) { return cluster.getMembers().size(); } http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java index 8c0fe6a..18f005d 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java @@ -151,9 +151,8 @@ public class HAProxy implements LoadBalancer { } } catch (Exception e) { if (log.isErrorEnabled()) { - log.error("Could not stop haproxy instance"); + log.error("Could not stop haproxy instance", e); } - throw new LoadBalancerExtensionException(e); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java index 5dec4d3..3c2cf36 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -22,12 +22,12 @@ package org.apache.stratos.haproxy.extension; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.common.util.CommandUtils; +import org.apache.stratos.load.balancer.common.domain.Cluster; +import org.apache.stratos.load.balancer.common.domain.Member; +import org.apache.stratos.load.balancer.common.domain.Port; +import org.apache.stratos.load.balancer.common.domain.Service; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.Port; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import java.io.IOException; @@ -35,14 +35,17 @@ import java.io.IOException; * HAProxy statistics reader. */ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { + private static final Log log = LogFactory.getLog(HAProxyStatisticsReader.class); private String scriptsPath; private String statsSocketFilePath; + private TopologyProvider topologyProvider; - public HAProxyStatisticsReader() { + public HAProxyStatisticsReader(TopologyProvider topologyProvider) { this.scriptsPath = HAProxyContext.getInstance().getScriptsPath(); this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath(); + this.topologyProvider = topologyProvider; } @Override @@ -51,7 +54,7 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { String[] array; int totalWeight, weight; - for (Service service : TopologyManager.getTopology().getServices()) { + for (Service service : topologyProvider.getTopology().getServices()) { for (Cluster cluster : service.getClusters()) { if (cluster.getClusterId().equals(clusterId)) { totalWeight = 0; @@ -63,25 +66,23 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader { for(String hostname : cluster.getHostNames()) { backendId = hostname+"-http-members"; for (Member member : cluster.getMembers()) { - if(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId())) { - // echo "get weight <backend>/<server>" | socat stdio <stats-socket> - command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); - try { - output = CommandUtils.executeCommand(command); - if ((output != null) && (output.length() > 0)) { - array = output.split(" "); - if ((array != null) && (array.length > 0)) { - weight = Integer.parseInt(array[0]); - if (log.isDebugEnabled()) { - log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); - } - totalWeight += weight; + // echo "get weight <backend>/<server>" | socat stdio <stats-socket> + command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); + try { + output = CommandUtils.executeCommand(command); + if ((output != null) && (output.length() > 0)) { + array = output.split(" "); + if ((array != null) && (array.length > 0)) { + weight = Integer.parseInt(array[0]); + if (log.isDebugEnabled()) { + log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); } + totalWeight += weight; } - } catch (IOException e) { - if (log.isErrorEnabled()) { - log.error(e); - } + } + } catch (IOException e) { + if (log.isErrorEnabled()) { + log.error(e); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java index c6ee22b..f56541d 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.PropertyConfigurator; import org.apache.stratos.common.threading.StratosThreadPool; +import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension; import java.util.concurrent.ExecutorService; @@ -44,12 +45,31 @@ public class Main { if (log.isInfoEnabled()) { log.info("HAProxy extension started"); } + + // Add shutdown hook + final Thread mainThread = Thread.currentThread(); + final LoadBalancerExtension finalExtension = extension; + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + try { + if(finalExtension != null) { + log.info("Shutting haproxy instance..."); + finalExtension.stop(); + } + mainThread.join(); + } catch (Exception e) { + log.error(e); + } + } + }); + executorService = StratosThreadPool.getExecutorService("haproxy.extension.thread.pool", 10); // Validate runtime parameters HAProxyContext.getInstance().validate(); - extension = new LoadBalancerExtension(new HAProxy(), - (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? - new HAProxyStatisticsReader() : null)); + TopologyProvider topologyProvider = new TopologyProvider(); + HAProxyStatisticsReader statisticsReader = HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? + new HAProxyStatisticsReader(topologyProvider) : null; + extension = new LoadBalancerExtension(new HAProxy(), statisticsReader, topologyProvider); extension.setExecutorService(executorService); extension.execute(); } catch (Exception e) { @@ -57,7 +77,8 @@ public class Main { log.error(e); } if (extension != null) { - extension.terminate(); + log.info("Shutting haproxy instance..."); + extension.stop(); } } }
