Adding faulty member event handlers

Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a754a642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a754a642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a754a642

Branch: refs/heads/master
Commit: a754a642622fb914ae9b6f84a85bb0b3a975a2e1
Parents: 7ea5cd9
Author: Udara Liyanage <[email protected]>
Authored: Mon Dec 2 19:46:57 2013 -0500
Committer: Udara Liyanage <[email protected]>
Committed: Mon Dec 2 22:08:02 2013 -0500

----------------------------------------------------------------------
 .../stratos/autoscaler/ClusterContext.java      | 29 ++++++++++++++
 .../deployment/policy/DeploymentPolicy.java     |  9 ++++-
 .../health/HealthEventMessageDelegator.java     | 40 ++++++++++++++++----
 .../processors/AutoscalerTopologyReceiver.java  | 39 +++++++++++++++++++
 .../event/topology/MemberStartedEvent.java      | 15 ++++++++
 5 files changed, 124 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
index 82d9d0c..8f6da21 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
@@ -61,6 +61,8 @@ public class ClusterContext {
     private Properties properties;
 
     private Map<String, MemberStatsContext> memberContextMap;
+    // Key- MemberId Value- partitionId
+    private Map<String, String> memberPartitionMap;
     private DeploymentPolicy deploymentPolicy;
 
     public ClusterContext(String clusterId, String serviceId, DeploymentPolicy 
deploymentPolicy, List<Partition> partitions) {
@@ -70,6 +72,7 @@ public class ClusterContext {
         this.setDeploymentPolicy(deploymentPolicy);
         partitionsOfThisCluster = new ArrayList<Partition>();
         memberContextMap = new HashMap<String, MemberStatsContext>();
+        setMemberPartitionMap(new HashMap<String, String>());
         partitionCountMap = new HashMap<String, Integer>();
         
         for (Partition partition : partitions) {
@@ -245,4 +248,30 @@ public class ClusterContext {
     public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
         this.deploymentPolicy = deploymentPolicy;
     }
+
+       /**
+        * @return the memberPartitionMap
+        */
+       public Map<String, String> getMemberPartitionMap() {
+               return memberPartitionMap;
+       }
+
+       /**
+        * @param memberPartitionMap the memberPartitionMap to set
+        */
+       public void setMemberPartitionMap(Map<String, String> 
memberPartitionMap) {
+               this.memberPartitionMap = memberPartitionMap;
+       }
+       
+       public void addMemberpartition(String memberId, String partitionId){
+               this.memberPartitionMap.put(memberId, partitionId);
+       }
+       
+       public void removeMemberPartition(String memberId){
+               this.memberPartitionMap.remove(memberId);
+       }
+       
+       public String getPartitonOfMember(String memberId){
+               return this.memberPartitionMap.get(memberId);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java
index 815fbd7..c253d9a 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/deployment/policy/DeploymentPolicy.java
@@ -79,7 +79,14 @@ public class DeploymentPolicy implements Serializable{
     public List<Partition> getAllPartitions() {
         return allPartitions;
     }
-
+    
+    public Partition getPartitionById(String id){
+       for(Partition p : this.getAllPartitions()){
+               if(p.getId().equalsIgnoreCase(id))
+                       return p;
+       }
+        return null;
+    }
     /**
      * Gets the value of the partition-groups.
      */

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
index 61a1706..da18a41 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
@@ -23,7 +23,12 @@ import com.google.gson.stream.JsonReader;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.ClusterContext;
 import org.apache.stratos.autoscaler.Constants;
+import 
org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.SpawningException;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.cloud.controller.deployment.partition.Partition;
 
 import javax.jms.TextMessage;
 
@@ -41,6 +46,7 @@ public class HealthEventMessageDelegator implements Runnable {
 
     private static final Log log = 
LogFactory.getLog(HealthEventMessageDelegator.class);
     private String eventName;
+    private String clusterId;
     private Map<String, String> messageProperties;
     @Override
     public void run() {
@@ -53,6 +59,7 @@ public class HealthEventMessageDelegator implements Runnable {
                                String messageText = message.getText();
 
                 messageProperties = setEventValues(messageText);
+                this.clusterId = messageProperties.get("cluster_id");
                 log.info("Received event " + eventName);
 //                for (Service service :  
TopologyManager.getTopology().getServices()){
 //
@@ -80,23 +87,23 @@ public class HealthEventMessageDelegator implements 
Runnable {
 //                        break;
 //                    }
 //                }
-                if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){
-                       String clusterId = messageProperties.get("cluster_id");
+                if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){    
                
                        Float messageValue = 
Float.parseFloat(messageProperties.get("value"));
                     
AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(messageValue);
 
-                }  else 
if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
-                       String clusterId = messageProperties.get("cluster_id");
+                }  else 
if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){                 
       
                        Float messageValue = 
Float.parseFloat(messageProperties.get("value"));
                     
AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightGradient(messageValue);
 
                 }  else 
if(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
-                       String clusterId = messageProperties.get("cluster_id");
                        Float messageValue = 
Float.parseFloat(messageProperties.get("value"));
                     
AutoscalerContext.getInstance().getClusterContext(clusterId).setRequestsInFlightSecondDerivative(messageValue);
 
                 }else if ("member_fault".equals(eventName)){
-                       // member with 
+                       String memberId = messageProperties.get("member_id");
+                       if(memberId != null && !memberId.isEmpty())
+                               log.error("MemberId is not included in the 
received message");
+                       handleMemberfaultEvent(memberId);                       
                 }
                 
                 // clear the message properties after handling the message.
@@ -109,7 +116,26 @@ public class HealthEventMessageDelegator implements 
Runnable {
         }
     }
 
-    public Map<String, String> setEventValues(String json) {
+    private void handleMemberfaultEvent(String memberId) {
+               try {
+                       // terminate the faulty member
+                       CloudControllerClient ccClient = 
CloudControllerClient.getInstance();
+                       ccClient.terminate(memberId);
+                       
+                       // start a new member in the same Partition
+                       ClusterContext clsCtx = 
AutoscalerContext.getInstance().getClusterContext(clusterId);
+                       String partitionId = 
clsCtx.getPartitonOfMember(memberId);
+                       Partition partition = 
clsCtx.getDeploymentPolicy().getPartitionById(partitionId);
+                       ccClient.spawnAnInstance(partition, clusterId);
+                       
+               } catch (TerminationException e) {
+                       log.error(e);
+               }catch(SpawningException e){
+                       log.error(e);
+               }               
+       }
+
+       public Map<String, String> setEventValues(String json) {
        
        Map<String, String> properties = new HashMap<String, String>();
        BufferedReader bufferedReader = new BufferedReader(new 
StringReader(json));

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
index 9c91cf3..96fb3af 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
@@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.topology.processors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
 import org.apache.stratos.autoscaler.ClusterMonitor;
 import org.apache.stratos.autoscaler.exception.PartitionValidationException;
@@ -34,11 +35,14 @@ import 
org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListene
 import 
org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
 import 
org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
 import 
org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberStartedEventListener;
+import 
org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
 import 
org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
 import org.apache.stratos.messaging.event.topology.*;
 import 
org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
 
 import java.util.Collection;
 
@@ -139,6 +143,40 @@ public class AutoscalerTopologyReceiver implements 
Runnable {
 
         });
         
+        processorChain.addEventListener(new MemberStartedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                       try {
+                                               
TopologyManager.acquireReadLock();
+                                               
+                                               MemberStartedEvent e = 
(MemberStartedEvent) event;
+                                               ClusterContext clusCtx = 
AutoscalerContext.getInstance().getClusterContext(e.getClusterId());
+                                               
clusCtx.addMemberpartition(e.getMemberId(), e.getPartitionId());
+                                       }
+                    finally{
+                       TopologyManager.releaseReadLock();
+                    }
+            }
+
+        });
+        
+        processorChain.addEventListener(new MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+             
+               try {
+                       TopologyManager.acquireReadLock();
+                                       MemberTerminatedEvent e = 
(MemberTerminatedEvent) event;
+                                       ClusterContext clusCtx = 
AutoscalerContext.getInstance()
+                                                       
.getClusterContext(e.getClusterId());
+                                       
clusCtx.removeMemberPartition(e.getMemberId());
+                               } finally {
+                                       TopologyManager.releaseReadLock();
+                               }
+            }
+
+        });
+        
         processorChain.addEventListener(new MemberActivatedEventListener() {
             @Override
             protected void onEvent(Event event) {
@@ -160,6 +198,7 @@ public class AutoscalerTopologyReceiver implements Runnable 
{
 //                }
             }
         });
+        
         processorChain.addEventListener(new ServiceRemovedEventListener() {
             @Override
             protected void onEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a754a642/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
index ce45e52..b2e2e61 100644
--- 
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
+++ 
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberStartedEvent.java
@@ -33,6 +33,7 @@ public class MemberStartedEvent extends TopologyEvent 
implements Serializable {
     private String memberId;
     private MemberStatus status;
     private Properties properties;
+    private String partitionId;
 
     public MemberStartedEvent(String serviceName, String clusterId, String 
memberId) {
         this.serviceName = serviceName;
@@ -67,4 +68,18 @@ public class MemberStartedEvent extends TopologyEvent 
implements Serializable {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+       /**
+        * @return the partitionId
+        */
+       public String getPartitionId() {
+               return partitionId;
+       }
+
+       /**
+        * @param partitionId the partitionId to set
+        */
+       public void setPartitionId(String partitionId) {
+               this.partitionId = partitionId;
+       }
 }

Reply via email to