Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 3b713db3a -> ef2aafa1b
adding status checker and monitor support for autoscaler Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0e1c582f Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0e1c582f Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0e1c582f Branch: refs/heads/4.0.0-grouping Commit: 0e1c582fa6b7b2dc4832b406a0c519762f18efac Parents: 8db3680 Author: reka <[email protected]> Authored: Thu Sep 18 16:24:44 2014 +0530 Committer: reka <[email protected]> Committed: Thu Sep 18 16:24:44 2014 +0530 ---------------------------------------------------------------------- .../stratos/autoscaler/AutoscalerContext.java | 7 + .../autoscaler/grouping/DependencyBuilder.java | 39 +++++ .../AutoscalerTopologyEventReceiver.java | 34 +++- .../stratos/autoscaler/monitor/Monitor.java | 30 +++- .../monitor/application/ApplicationMonitor.java | 125 ++++++++------- .../monitor/cluster/ClusterMonitor.java | 23 --- .../autoscaler/monitor/group/GroupMonitor.java | 8 + .../status/checker/ClusterStatusChecker.java | 30 ++-- .../status/checker/GroupStatusChecker.java | 8 +- .../status/checker/StatusChecker.java | 156 +++++++++++++++++-- .../stratos/autoscaler/util/AutoscalerUtil.java | 3 + .../messaging/domain/topology/Application.java | 10 ++ .../messaging/domain/topology/Group.java | 10 ++ .../domain/topology/ParentBehavior.java | 4 + .../domain/topology/util/GroupStatus.java | 40 +++++ .../event/topology/ClusterActivatedEvent.java | 12 +- .../event/topology/GroupActivatedEvent.java | 16 ++ .../topology/TopologyMessageProcessorChain.java | 6 - 18 files changed, 444 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java index 4ea6b28..2854f13 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/AutoscalerContext.java @@ -26,6 +26,7 @@ import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.autoscaler.monitor.group.GroupMonitor; import org.apache.stratos.autoscaler.monitor.cluster.LbClusterMonitor; +import org.apache.stratos.autoscaler.status.checker.StatusChecker; import java.util.HashMap; import java.util.Map; @@ -47,6 +48,9 @@ public class AutoscalerContext { // Map<ClusterId, ClusterMonitor> private Map<String, ClusterMonitor> monitors; + + // Map<ClusterId, ClusterMonitor> + private Map<String, StatusChecker> statusCheckers; // Map<LBClusterId, LBClusterMonitor> private Map<String, LbClusterMonitor> lbMonitors; @@ -74,6 +78,9 @@ public class AutoscalerContext { this.groupMonitors = groupMonitors; } + public Map<String, StatusChecker> getStatusCheckers() { + return statusCheckers; + } private static class Holder { private static final AutoscalerContext INSTANCE = new AutoscalerContext(); http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/DependencyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/DependencyBuilder.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/DependencyBuilder.java index 0566e49..00bf9b4 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/DependencyBuilder.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/DependencyBuilder.java @@ -18,9 +18,48 @@ */ package org.apache.stratos.autoscaler.grouping; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.DependencyOrder; +import org.apache.stratos.messaging.domain.topology.StartupOrder; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + /** * This is to build the startup/termination dependencies * across all the groups and clusters */ public class DependencyBuilder { + + public static Queue<String> getStartupOrder(Application application) { + + Queue<String> startup = new LinkedList<String>(); + DependencyOrder dependencyOrder = application.getDependencyOrder(); + Set<StartupOrder> startupOrderSet = dependencyOrder.getStartupOrders(); + for (StartupOrder startupOrder : startupOrderSet) { + + String start = startupOrder.getStart(); + String after = startupOrder.getAfter(); + + if (!startup.contains(start)) { + startup.add(start); + if (!startup.contains(after)) { + startup.add(after); + + } else { + //TODO throw exception since after is there before start + } + } else { + if (!startup.contains(after)) { + startup.add(after); + } else { + //TODO throw exception since start and after already there + } + } + } + return startup; + + } + } http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 9ada44e..d86bdb3 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -29,6 +29,7 @@ import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.autoscaler.policy.PolicyManager; +import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.messaging.domain.topology.Application; import org.apache.stratos.messaging.domain.topology.Cluster; @@ -117,7 +118,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { //TODO build dependency and organize the application //start the application monitor - startApplicationMonitor(applicationCreatedEvent.getApplication()); + // startApplicationMonitor(applicationCreatedEvent.getApplication()); } finally { //release read lock @@ -127,6 +128,34 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } }); + topologyEventReceiver.addEventListener(new GroupActivatedEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[GroupActivatedEvent] Received: " + event.getClass()); + GroupActivatedEvent groupActivatedEvent = (GroupActivatedEvent) event; + + //trigger status checker + //StatusChecker.getInstance().onGroupStatusChange(groupActivatedEvent.getGroupId(), + //groupActivatedEvent.getAppId()); + + } + }); + + topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[ClusterActivatedEvent] Received: " + event.getClass()); + ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event; + + //trigger status checker + /*StatusChecker.getInstance().onClusterStatusChange(clusterActivatedEvent.getClusterId(), + clusterActivatedEvent.getAppId()); +*/ + } + }); + topologyEventReceiver.addEventListener(new ApplicationRemovedEventListener() { @Override protected void onEvent(Event event) { @@ -383,6 +412,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { // partitionContext.incrementCurrentActiveMemberCount(1); partitionContext.movePendingMemberToActiveMembers(memberId); + //trigger status checker + StatusChecker.getInstance().onMemberStatusChange(e.getClusterId()); + } catch (Exception e) { log.error("Error processing event", e); } finally { http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java index f375e1a..2291c06 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/Monitor.java @@ -20,6 +20,7 @@ package org.apache.stratos.autoscaler.monitor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.exception.PartitionValidationException; import org.apache.stratos.autoscaler.exception.PolicyValidationException; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; @@ -29,6 +30,7 @@ import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Group; +import org.apache.stratos.messaging.event.Event; import java.util.Map; import java.util.Observable; @@ -60,6 +62,14 @@ public abstract class Monitor implements Observer, Runnable { return abstractClusterMonitors; } + public void addAbstractMonitor(AbstractClusterMonitor monitor) { + this.abstractClusterMonitors.put(monitor.getClusterId(), monitor); + } + + public AbstractClusterMonitor getAbstractMonitor(String clusterId) { + return this.abstractClusterMonitors.get(clusterId); + } + public void setAbstractClusterMonitors(Map<String, AbstractClusterMonitor> abstractClusterMonitors) { this.abstractClusterMonitors = abstractClusterMonitors; } @@ -67,10 +77,22 @@ public abstract class Monitor implements Observer, Runnable { public abstract void monitor(); @Override - public void update(Observable observable, Object o) { - + public void update(Observable observable, Object arg) { + if(arg instanceof Event) { + Event event = (Event) arg; + if(log.isDebugEnabled()) { + log.debug(String.format("Event received: %s", event.getClass().getName())); + } + onEvent(event); + } } + /** + * Triggered when an event is received. + * @param event + */ + protected abstract void onEvent(Event event); + public String getId() { return this.id; } @@ -171,7 +193,7 @@ public abstract class Monitor implements Observer, Runnable { Thread th = new Thread(monitor); th.start(); - //AutoscalerContext.getInstance().addMonitor(monitor); + AutoscalerContext.getInstance().addMonitor(monitor); abstractClusterMonitors.put(cluster.getClusterId(), monitor); if (log.isInfoEnabled()) { log.info(String.format("Cluster monitor has been added successfully: [cluster] %s", @@ -268,7 +290,7 @@ public abstract class Monitor implements Observer, Runnable { Thread th = new Thread(monitor); th.start(); - //AutoscalerContext.getInstance().addLbMonitor(monitor); + AutoscalerContext.getInstance().addLbMonitor(monitor); abstractClusterMonitors.put(cluster.getClusterId(), monitor); if (log.isInfoEnabled()) { log.info(String.format("LB Cluster monitor has been added successfully: [cluster] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java index 7e27fa6..0b248f2 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java @@ -20,50 +20,48 @@ package org.apache.stratos.autoscaler.monitor.application; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.grouping.DependencyBuilder; import org.apache.stratos.autoscaler.monitor.Monitor; +import org.apache.stratos.autoscaler.monitor.group.GroupMonitor; import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.domain.topology.util.GroupStatus; +import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import java.util.*; +/** + * ApplicationMonitor is to control the child monitors + */ public class ApplicationMonitor extends Monitor { private static final Log log = LogFactory.getLog(ApplicationMonitor.class); private Application application; private Queue<String> preOrderTraverse = new LinkedList<String>(); - private Queue<Cluster> clusters = new LinkedList<Cluster>(); - Map<String, StatusChecker> statusCheckerMap = new HashMap<String, StatusChecker>(); - - private Queue<Group> groups = new LinkedList<Group>(); - public ApplicationMonitor(Application application) { this.application = application; //TODO build dependencies and keep them here - DependencyOrder dependencyOrder = application.getDependencyOrder(); - Set<StartupOrder> startupOrderSet = dependencyOrder.getStartupOrders(); - for(StartupOrder startupOrder: startupOrderSet) { + startDependency(); - String start = startupOrder.getStart(); - String after = startupOrder.getAfter(); + } - if (!preOrderTraverse.contains(start)) { - preOrderTraverse.add(start); - if (!preOrderTraverse.contains(after)) { - preOrderTraverse.add(after); + @Override + public void update(Observable observable, Object arg) { + if(arg instanceof Event) { - } else { - //TODO throw exception since after is there before start - } - } else { - if (!preOrderTraverse.contains(after)) { - preOrderTraverse.add(after); - } else { - //TODO throw exception since start and after already there - } - } } + } + + @Override + protected void onEvent(Event event) { + + } + + public void startDependency() { + preOrderTraverse = DependencyBuilder.getStartupOrder(application); + //TODO find out the parallel ones //start the first dependency @@ -76,52 +74,73 @@ public class ApplicationMonitor extends Monitor { TopologyManager.acquireReadLock(); cluster = TopologyManager.getTopology().getService(dependency).getCluster(clusterId); TopologyManager.releaseReadLock(); - if(cluster != null) { - startClusterMonitor(cluster); - } else { - //TODO throw exception since Topology is inconsistent - } + if(cluster != null) { + startClusterMonitor(cluster); + } else { + //TODO throw exception since Topology is inconsistent + } } } + /** + * Find the group monitor by traversing recursively in the hierarchical monitors. + * @param id the unique alias of the Group + * @return the found GroupMonitor + */ + public Monitor findGroupMonitorWithId(String id) { + return findGroupMonitor(id, groupMonitors.values()); - //start the least dependent cluster monitor as part of the applicationCreatedEvent - public void registerFirstClusterMonitor() { - //build dependency tree - - - - - //traverse dependency tree and find the clusters to be started and - // register the correct GroupMonitor or ClusterMonitor - //startGroupMonitor(groups.peek()); - //groups.poll(); - + } + private Monitor findGroupMonitor(String id, Collection<GroupMonitor> monitors) { + for (GroupMonitor monitor : monitors) { + // check if alias is equal, if so, return + if (monitor.equals(id)) { + return monitor; + } else { + // check if this Group has nested sub Groups. If so, traverse them as well + if (monitor.getGroupMonitors() != null) { + return findGroupMonitor(id, monitor.getGroupMonitors().values()); + } + } + } + return null; + } + public Monitor findParentOfGroup(String groupId) { + return findParentMonitor(groupId, this); + } + private Monitor findParentMonitor(String groupId, Monitor monitor) { + //if this monitor has the group, return it as the parent + if(monitor.getGroupMonitors().containsKey(groupId)) { + return monitor; + } else { + if(monitor.getGroupMonitors() != null) { + //check whether the children has the group and find its parent + for(GroupMonitor groupMonitor : monitor.getGroupMonitors().values()) { + return findParentMonitor(groupId, groupMonitor); + } + } + } + return null; } - public void startMonitor() { - - } @Override public void run() { - while (true ) { //TODO add the correct status + while (true) { if (log.isDebugEnabled()) { - log.debug("App monitor is running.. " + this.toString()); + log.debug("Application monitor is running.. " + this.toString()); } - - - + monitor(); try { - // TODO make this configurable - Thread.sleep(30000); - } catch (InterruptedException ignore) { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); } } } @@ -129,7 +148,9 @@ public class ApplicationMonitor extends Monitor { @Override public void monitor() { + startDependency(); + //evaluate dependency } public Queue<String> getPreOrderTraverse() { http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index 9558384..1cd6d9f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -124,30 +124,7 @@ public class ClusterMonitor extends AbstractClusterMonitor { public void monitor() { //TODO make this concurrent - /*final ClusterMonitor clusterMonitor = this; - - Runnable checkClusterStatus = new Runnable() { - @Override - public void run() { - boolean clusterActive = false; - for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { - //minimum check per partition - for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { - if(partitionContext.getMinimumMemberCount() == partitionContext.getActiveMemberCount()) { - clusterActive = true; - } - clusterActive = false; - } - - } - // if active then notify upper layer - if(clusterActive) { - clusterMonitor.setStatus(ClusterStatus.Active); - } - - } - };*/ for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { // store primary members in the network partition context List<String> primaryMemberListInNetworkPartition = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java index 185442e..1688b82 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/group/GroupMonitor.java @@ -20,6 +20,7 @@ package org.apache.stratos.autoscaler.monitor.group; import org.apache.stratos.autoscaler.monitor.Monitor; import org.apache.stratos.messaging.domain.topology.Group; +import org.apache.stratos.messaging.event.Event; import java.util.List; import java.util.Map; @@ -30,6 +31,8 @@ import java.util.Map; */ public class GroupMonitor extends Monitor { + + public GroupMonitor(Group group) { } @@ -40,6 +43,11 @@ public class GroupMonitor extends Monitor { } + @Override + protected void onEvent(Event event) { + + } + @Override public void run() { http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/ClusterStatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/ClusterStatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/ClusterStatusChecker.java index dd41767..a47ee7f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/ClusterStatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/ClusterStatusChecker.java @@ -18,22 +18,28 @@ */ package org.apache.stratos.autoscaler.status.checker; + +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.PartitionContext; +import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; +import org.apache.stratos.autoscaler.monitor.group.GroupMonitor; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.domain.topology.util.GroupStatus; +import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +import java.util.Collection; +import java.util.Map; + /** * Cluster status checker will periodically check the cluster status * and notify the interested parties */ -public class ClusterStatusChecker extends StatusChecker { - private String appId; - private String groupId; - private String clsuterId; +public class ClusterStatusChecker { + - public ClusterStatusChecker(String appId, String groupId, String clusterId) { - this.appId = appId; - this.groupId = groupId; - this.clsuterId = clusterId; - } - @Override - public void run() { + - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/GroupStatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/GroupStatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/GroupStatusChecker.java index ea8e56c..f69c2d6 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/GroupStatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/GroupStatusChecker.java @@ -18,11 +18,13 @@ */ package org.apache.stratos.autoscaler.status.checker; +import org.apache.stratos.autoscaler.monitor.Monitor; + /** * Group status checker will check the group status and * notify the interested parties on behalf of the status changes */ -public class GroupStatusChecker extends StatusChecker { +public class GroupStatusChecker { private String groupId; private String appId; @@ -30,8 +32,4 @@ public class GroupStatusChecker extends StatusChecker { this.groupId = groupId; this.appId = appId; } - @Override - public void run() { - - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index 154de32..06cf2f6 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -20,29 +20,159 @@ package org.apache.stratos.autoscaler.status.checker; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.monitor.Monitor; -import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.listener.EventListener; +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.PartitionContext; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.domain.topology.util.GroupStatus; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import java.util.Observable; +import java.util.Map; /** * This will be used to evaluate the status of a group * and notify the interested parties about the status changes. */ -public abstract class StatusChecker extends Observable implements Runnable { - +public class StatusChecker { private static final Log log = LogFactory.getLog(StatusChecker.class); - public void addObserver(EventListener eventListener) { - addObserver(eventListener); + + private StatusChecker() { + + } + + private static class Holder { + private static final StatusChecker INSTANCE = new StatusChecker(); + } + + public static StatusChecker getInstance() { + //TODO synchronized + return Holder.INSTANCE; } - public void notifyObservers(Monitor monitor) { - if(log.isDebugEnabled()) { - log.debug(String.format("Notifying the observers: [monitor] %s", monitor.getClass().getName())); + public void onMemberStatusChange(String clusterId1) { + final String clusterId = clusterId1; + Runnable exCluster = new Runnable() { + public void run() { + ClusterMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId); + boolean clusterActive = false; + for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) { + //minimum check per partition + for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { + if(partitionContext.getMinimumMemberCount() == partitionContext.getActiveMemberCount()) { + clusterActive = true; + } + clusterActive = false; + } + + } + // if active then notify upper layer + if(clusterActive) { + //send event to cluster status topic + + } + + } + }; + } + + + /** + * + * @param id + * @param appId + */ + public void onGroupStatusChange(final String id, final String appId) { + + Runnable exGroup = new Runnable() { + public void run() { + /** + * + */ + Application application = TopologyManager.getTopology().getApplication(appId); + Map<String, String> clusterIds = application.getClusterMap(); + Map<String, Group> groups = application.getGroupMap(); + updateChildStatus(id, groups, clusterIds, application); + } + }; + } + + public void onClusterStatusChange(final String id, final String appId) { + + Runnable exGroup = new Runnable() { + public void run() { + Application application = TopologyManager.getTopology().getApplication(appId); + Map<String, String> clusterIds = application.getClusterMap(); + Map<String, Group> groups = application.getGroupMap(); + updateChildStatus(id, groups, clusterIds, application); + } + }; + } + + private boolean updateChildStatus(String id, Map<String, Group> groups, Map<String, String> clusterIds, ParentBehavior parent) { + boolean groupActive = false; + boolean clustersActive = false; + boolean groupsActive = false; + boolean childFound = false; + + if(clusterIds.containsValue(id) || groups.containsKey(id)) { + childFound = true; + if(!clusterIds.isEmpty() && !groups.isEmpty()) { + clustersActive = getClusterStatus(clusterIds); + groupsActive = getGroupStatus(groups); + groupActive = clustersActive && groupsActive; + } else if (!groups.isEmpty()){ + groupsActive = getGroupStatus(groups); + groupActive = groupsActive; + } else if (!clusterIds.isEmpty()){ + clustersActive = getClusterStatus(clusterIds); + groupActive = clustersActive; + } else { + //TODO warn log + } + //send the activation event + if(parent instanceof Application && groupActive) { + //TODO send application activated event + } else if(parent instanceof Group && groupActive) { + //TODO send Group activated event + } + return childFound; + } else { + if(!groups.isEmpty()) { + for(Group group: groups.values()) { + return updateChildStatus(id, group.getGroupMap(), group.getClusterMap(), group); + + } + } + } + return childFound; + } + + private boolean getGroupStatus(Map<String, Group> groups) { + boolean groupActiveStatus = false; + for(Group group: groups.values()) { + if(group.getStatus().equals(GroupStatus.Active)) { + groupActiveStatus = true; + } else { + groupActiveStatus = false; + } } - setChanged(); - notifyObservers(monitor); + return groupActiveStatus; + } + + private boolean getClusterStatus(Map<String, String> clusterIds) { + boolean clusterActiveStatus = false; + for(Map.Entry<String, String> clusterId: clusterIds.entrySet()) { + Service service = TopologyManager.getTopology().getService(clusterId.getKey()); + if(service.getCluster(clusterId.getValue()).getStatus().equals(ClusterStatus.Active)) { + clusterActiveStatus = true; + } else { + clusterActiveStatus = false; + } + } + return clusterActiveStatus; + } + } http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java index bc9d450..e637451 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java @@ -177,6 +177,9 @@ public class AutoscalerUtil { log.debug("Set the lb reference type: "+value); } } + + //register a status Checker + // set hasPrimary property // hasPrimary is true if there are primary members available in that cluster http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Application.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Application.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Application.java index 34e7a9d..1f83888 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Application.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Application.java @@ -71,6 +71,16 @@ public class Application implements ParentBehavior { } @Override + public Map<String, Group> getGroupMap() { + return this.groupMap; + } + + @Override + public Map<String, String> getClusterMap() { + return this.clusterIdMap; + } + + @Override public Group getGroupRecursively(String groupAlias) { return travereAndCheckRecursively(groupMap.values(), groupAlias); http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Group.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Group.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Group.java index 072e55d..c6c1f78 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Group.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Group.java @@ -67,6 +67,16 @@ public class Group implements ParentBehavior { } @Override + public Map<String, Group> getGroupMap() { + return this.groupMap; + } + + @Override + public Map<String, String> getClusterMap() { + return this.clusterIdMap; + } + + @Override public Group getGroupRecursively(String groupAlias) { return travereAndCheckRecursively(groupMap.values(), groupAlias); http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/ParentBehavior.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/ParentBehavior.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/ParentBehavior.java index a21f8ee..a958ea8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/ParentBehavior.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/ParentBehavior.java @@ -31,6 +31,10 @@ public interface ParentBehavior extends Serializable { public Group getGroup (String groupName); + public Map<String, Group> getGroupMap(); + + public Map<String, String> getClusterMap(); + public Group getGroupRecursively (String groupAlias); public Collection<Group> getGroups (); http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/util/GroupStatus.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/util/GroupStatus.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/util/GroupStatus.java new file mode 100644 index 0000000..e00d4fa --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/util/GroupStatus.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.domain.topology.util; + +public enum GroupStatus { + + Created(1), + In_Maintenance(2), + terminating(3), + Removed(4), + Running(5), + Active(6); + + private int code; + + private GroupStatus(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterActivatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterActivatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterActivatedEvent.java index f041358..c1ce207 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterActivatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ClusterActivatedEvent.java @@ -19,15 +19,17 @@ package org.apache.stratos.messaging.event.topology; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.event.Event; /** * Cluster activated event will be sent by Autoscaler */ -public class ClusterActivatedEvent { +public class ClusterActivatedEvent extends Event { private final String serviceName; private final String clusterId; private ClusterStatus status; + private String appId; public ClusterActivatedEvent(String serviceName, String clusterId) { this.serviceName = serviceName; @@ -55,4 +57,12 @@ public class ClusterActivatedEvent { public void setStatus(ClusterStatus status) { this.status = status; } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupActivatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupActivatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupActivatedEvent.java index 2081e3d..b5c8148 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupActivatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupActivatedEvent.java @@ -31,4 +31,20 @@ public class GroupActivatedEvent extends Event { this.appId = appId; this.groupId = groupId; } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/0e1c582f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index b048cfd..652ebe4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -47,7 +47,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor; private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor; private GroupActivatedProcessor groupActivatedProcessor; - private CompositeApplicationCreatedMessageProcessor compositeApplicationCreatedMessageProcessor; private CompositeApplicationRemovedMessageProcessor compositeApplicationRemovedMessageProcessor; private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor; private ApplicationRemovedMessageProcessor applicationRemovedMessageProcessor; @@ -105,9 +104,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { applicationRemovedMessageProcessor = new ApplicationRemovedMessageProcessor(); add(applicationRemovedMessageProcessor); - compositeApplicationCreatedMessageProcessor = new CompositeApplicationCreatedMessageProcessor(); - add(applicationCreatedMessageProcessor); - compositeApplicationRemovedMessageProcessor = new CompositeApplicationRemovedMessageProcessor(); add(applicationRemovedMessageProcessor); @@ -156,8 +152,6 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { applicationCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationRemovedEventListener) { applicationRemovedMessageProcessor.addEventListener(eventListener); - } else if (eventListener instanceof CompositeApplicationCreatedEventListener) { - compositeApplicationCreatedMessageProcessor.addEventListener(eventListener); if (log.isDebugEnabled()) { log.debug("Grouping: added eventlistener to applicationCreatedMessageProcessor: " + eventListener); }
