Adding faulty member event handlers
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a754a642 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a754a642 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a754a642 Branch: refs/heads/master Commit: a754a642622fb914ae9b6f84a85bb0b3a975a2e1 Parents: 7ea5cd9 Author: Udara Liyanage <[email protected]> Authored: Mon Dec 2 19:46:57 2013 -0500 Committer: Udara Liyanage <[email protected]> Committed: Mon Dec 2 22:08:02 2013 -0500 ---------------------------------------------------------------------- .../stratos/autoscaler/ClusterContext.java | 29 ++++++++++++++ .../deployment/policy/DeploymentPolicy.java | 9 ++++- .../health/HealthEventMessageDelegator.java | 40 ++++++++++++++++---- .../processors/AutoscalerTopologyReceiver.java | 39 +++++++++++++++++++ .../event/topology/MemberStartedEvent.java | 15 ++++++++ 5 files changed, 124 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java index 82d9d0c..8f6da21 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java @@ -61,6 +61,8 @@ public class ClusterContext { private Properties properties; private Map<String, MemberStatsContext> memberContextMap; + // Key- MemberId Value- partitionId + private Map<String, String> memberPartitionMap; private DeploymentPolicy deploymentPolicy; public ClusterContext(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy, List<Partition> partitions) { @@ -70,6 +72,7 @@ public class ClusterContext { this.setDeploymentPolicy(deploymentPolicy); partitionsOfThisCluster = new ArrayList<Partition>(); memberContextMap = new HashMap<String, MemberStatsContext>(); + setMemberPartitionMap(new HashMap<String, String>()); partitionCountMap = new HashMap<String, Integer>(); for (Partition partition : partitions) { @@ -245,4 +248,30 @@ public class ClusterContext { public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) { this.deploymentPolicy = deploymentPolicy; } + + /** + * @return the memberPartitionMap + */ + public Map<String, String> getMemberPartitionMap() { + return memberPartitionMap; + } + + /** + * @param memberPartitionMap the memberPartitionMap to set + */ + public void setMemberPartitionMap(Map<String, String> memberPartitionMap) { + this.memberPartitionMap = memberPartitionMap; + } + + public void addMemberpartition(String memberId, String partitionId){ + this.memberPartitionMap.put(memberId, partitionId); + } + + public void removeMemberPartition(String memberId){ + this.memberPartitionMap.remove(memberId); + } + + public String getPartitonOfMember(String memberId){ + return this.memberPartitionMap.get(memberId); + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java index 815fbd7..c253d9a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java @@ -79,7 +79,14 @@ public class DeploymentPolicy implements Serializable{ public List<Partition> getAllPartitions() { return allPartitions; } - + + public Partition getPartitionById(String id){ + for(Partition p : this.getAllPartitions()){ + if(p.getId().equalsIgnoreCase(id)) + return p; + } + return null; + } /** * Gets the value of the partition-groups. */ http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java index 61a1706..da18a41 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java @@ -23,7 +23,12 @@ import com.google.gson.stream.JsonReader; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.ClusterContext; import org.apache.stratos.autoscaler.Constants; +import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; +import org.apache.stratos.autoscaler.exception.SpawningException; +import org.apache.stratos.autoscaler.exception.TerminationException; +import org.apache.stratos.cloud.controller.deployment.partition.Partition; import javax.jms.TextMessage; @@ -41,6 +46,7 @@ public class HealthEventMessageDelegator implements Runnable { private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class); private String eventName; + private String clusterId; private Map<String, String> messageProperties; @Override public void run() { @@ -53,6 +59,7 @@ public class HealthEventMessageDelegator implements Runnable { String messageText = message.getText(); messageProperties = setEventValues(messageText); + this.clusterId = messageProperties.get("cluster_id"); log.info("Received event " + eventName); // for (Service service : TopologyManager.getTopology().getServices()){ // @@ -80,23 +87,23 @@ public class HealthEventMessageDelegator implements Runnable { // break; // } // } - if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){ - String clusterId = messageProperties.get("cluster_id"); + if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){ Float messageValue = Float.parseFloat(messageProperties.get("value")); AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(messageValue); - } else if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){ - String clusterId = messageProperties.get("cluster_id"); + } else if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){ Float messageValue = Float.parseFloat(messageProperties.get("value")); AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightGradient(messageValue); } else if(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)){ - String clusterId = messageProperties.get("cluster_id"); Float messageValue = Float.parseFloat(messageProperties.get("value")); AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightSecondDerivative(messageValue); }else if ("member_fault".equals(eventName)){ - // member with + String memberId = messageProperties.get("member_id"); + if(memberId != null && !memberId.isEmpty()) + log.error("MemberId is not included in the received message"); + handleMemberfaultEvent(memberId); } // clear the message properties after handling the message. @@ -109,7 +116,26 @@ public class HealthEventMessageDelegator implements Runnable { } } - public Map<String, String> setEventValues(String json) { + private void handleMemberfaultEvent(String memberId) { + try { + // terminate the faulty member + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + ccClient.terminate(memberId); + + // start a new member in the same Partition + ClusterContext clsCtx = AutoscalerContext.getInstance().getClusterContext(clusterId); + String partitionId = clsCtx.getPartitonOfMember(memberId); + Partition partition = clsCtx.getDeploymentPolicy().getPartitionById(partitionId); + ccClient.spawnAnInstance(partition, clusterId); + + } catch (TerminationException e) { + log.error(e); + }catch(SpawningException e){ + log.error(e); + } + } + + public Map<String, String> setEventValues(String json) { Map<String, String> properties = new HashMap<String, String>(); BufferedReader bufferedReader = new BufferedReader(new StringReader(json)); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java index 9c91cf3..96fb3af 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.topology.processors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.ClusterContext; import org.apache.stratos.autoscaler.ClusterMonitor; import org.apache.stratos.autoscaler.exception.PartitionValidationException; @@ -34,11 +35,14 @@ import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListene import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener; import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener; import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; import java.util.Collection; @@ -139,6 +143,40 @@ public class AutoscalerTopologyReceiver implements Runnable { }); + processorChain.addEventListener(new MemberStartedEventListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyManager.acquireReadLock(); + + MemberStartedEvent e = (MemberStartedEvent) event; + ClusterContext clusCtx = AutoscalerContext.getInstance().getClusterContext(e.getClusterId()); + clusCtx.addMemberpartition(e.getMemberId(), e.getPartitionId()); + } + finally{ + TopologyManager.releaseReadLock(); + } + } + + }); + + processorChain.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + + try { + TopologyManager.acquireReadLock(); + MemberTerminatedEvent e = (MemberTerminatedEvent) event; + ClusterContext clusCtx = AutoscalerContext.getInstance() + .getClusterContext(e.getClusterId()); + clusCtx.removeMemberPartition(e.getMemberId()); + } finally { + TopologyManager.releaseReadLock(); + } + } + + }); + processorChain.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { @@ -160,6 +198,7 @@ public class AutoscalerTopologyReceiver implements Runnable { // } } }); + processorChain.addEventListener(new ServiceRemovedEventListener() { @Override protected void onEvent(Event event) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java index ce45e52..b2e2e61 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java @@ -33,6 +33,7 @@ public class MemberStartedEvent extends TopologyEvent implements Serializable { private String memberId; private MemberStatus status; private Properties properties; + private String partitionId; public MemberStartedEvent(String serviceName, String clusterId, String memberId) { this.serviceName = serviceName; @@ -67,4 +68,18 @@ public class MemberStartedEvent extends TopologyEvent implements Serializable { public void setProperties(Properties properties) { this.properties = properties; } + + /** + * @return the partitionId + */ + public String getPartitionId() { + return partitionId; + } + + /** + * @param partitionId the partitionId to set + */ + public void setPartitionId(String partitionId) { + this.partitionId = partitionId; + } }
