Repository: stratos Updated Branches: refs/heads/docker-grouping-merge be8885862 -> 15aea18e7
Fixing STRATOS-930 Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/08de40fd Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/08de40fd Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/08de40fd Branch: refs/heads/docker-grouping-merge Commit: 08de40fd9b0ea8128f0aec4612e54df27a6a925c Parents: 2534024 Author: R-Rajkumar <[email protected]> Authored: Thu Oct 30 15:19:07 2014 +0530 Committer: R-Rajkumar <[email protected]> Committed: Thu Oct 30 15:19:07 2014 +0530 ---------------------------------------------------------------------- .../org.apache.stratos.cloud.controller/pom.xml | 5 + .../impl/CloudControllerServiceImpl.java | 350 ++++++++++--------- .../internal/CloudControllerDSComponent.java | 1 + .../controller/topology/TopologyBuilder.java | 298 ++++++++-------- .../topology/TopologyEventPublisher.java | 72 +--- 5 files changed, 352 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/08de40fd/components/org.apache.stratos.cloud.controller/pom.xml ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/pom.xml b/components/org.apache.stratos.cloud.controller/pom.xml index 1f63e3f..2f44e92 100644 --- a/components/org.apache.stratos.cloud.controller/pom.xml +++ b/components/org.apache.stratos.cloud.controller/pom.xml @@ -311,6 +311,11 @@ <artifactId>org.wso2.carbon.ntask.core</artifactId> <version>4.2.0</version> </dependency> + <dependency> + <groupId>org.apache.stratos</groupId> + <artifactId>org.apache.stratos.metadata.client</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <properties> <gson2.version>2.2</gson2.version> http://git-wip-us.apache.org/repos/asf/stratos/blob/08de40fd/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java index 3d7be2a..34a611f 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java @@ -20,13 +20,20 @@ package org.apache.stratos.cloud.controller.impl; import com.google.common.collect.ImmutableSet; import com.google.common.net.InetAddresses; + +import org.apache.commons.collections.ListUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.application.parser.DefaultApplicationParser; import org.apache.stratos.cloud.controller.concurrent.PartitionValidatorCallable; +import org.apache.stratos.cloud.controller.concurrent.ScheduledThreadExecutor; import org.apache.stratos.cloud.controller.concurrent.ThreadExecutor; import org.apache.stratos.cloud.controller.deployment.partition.Partition; import org.apache.stratos.cloud.controller.exception.*; +import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKubernetesService; +import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController; +import org.apache.stratos.cloud.controller.functions.PodToMemberContext; import org.apache.stratos.cloud.controller.interfaces.ApplicationParser; import org.apache.stratos.cloud.controller.interfaces.CloudControllerService; import org.apache.stratos.cloud.controller.interfaces.Iaas; @@ -49,7 +56,6 @@ import org.apache.stratos.kubernetes.client.model.Label; import org.apache.stratos.kubernetes.client.model.Pod; import org.apache.stratos.kubernetes.client.model.ReplicationController; import org.apache.stratos.kubernetes.client.model.Service; -import org.apache.stratos.messaging.domain.topology.Application; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.MemberStatus; import org.apache.stratos.messaging.util.Constants; @@ -59,6 +65,7 @@ import org.jclouds.compute.domain.NodeMetadataBuilder; import org.jclouds.compute.domain.Template; import org.jclouds.rest.ResourceNotFoundException; import org.wso2.carbon.registry.core.exceptions.RegistryException; +import org.apache.stratos.messaging.domain.topology.Application; import java.util.*; import java.util.Map.Entry; @@ -74,7 +81,7 @@ import java.util.concurrent.Future; */ public class CloudControllerServiceImpl implements CloudControllerService { - private static final Log log = LogFactory + private static final Log LOG = LogFactory .getLog(CloudControllerServiceImpl.class); private FasterLookUpDataHolder dataHolder = FasterLookUpDataHolder .getInstance(); @@ -102,22 +109,22 @@ public class CloudControllerServiceImpl implements CloudControllerService { currentData.setClusterIdToMemberContext(serializedObj.getClusterIdToMemberContext()); currentData.setCartridges(serializedObj.getCartridges()); currentData.setKubClusterIdToKubClusterContext(serializedObj.getKubClusterIdToKubClusterContext()); - currentData.setServiceGroups(serializedObj.getServiceGroups()); + currentData.setServiceGroups(serializedObj.getServiceGroups()); - if(log.isDebugEnabled()) { + if(LOG.isDebugEnabled()) { - log.debug("Cloud Controller Data is retrieved from registry."); + LOG.debug("Cloud Controller Data is retrieved from registry."); } } else { - if(log.isDebugEnabled()) { + if(LOG.isDebugEnabled()) { - log.debug("Cloud Controller Data cannot be found in registry."); + LOG.debug("Cloud Controller Data cannot be found in registry."); } } } catch (Exception e) { String msg = "Unable to acquire data from Registry. Hence, any historical data will not get reflected."; - log.warn(msg, e); + LOG.warn(msg, e); } } @@ -128,8 +135,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { handleNullObject(cartridgeConfig, "Invalid Cartridge Definition: Definition is null."); - if(log.isDebugEnabled()){ - log.debug("Cartridge definition: " + cartridgeConfig.toString()); + if(LOG.isDebugEnabled()){ + LOG.debug("Cartridge definition: " + cartridgeConfig.toString()); } Cartridge cartridge = null; @@ -141,7 +148,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { "Invalid Cartridge Definition: Cartridge Type: " + cartridgeConfig.getType()+ ". Cause: Cannot instantiate a Cartridge Instance with the given Config. "+e.getMessage(); - log.error(msg, e); + LOG.error(msg, e); throw new InvalidCartridgeDefinitionException(msg, e); } @@ -156,11 +163,19 @@ public class CloudControllerServiceImpl implements CloudControllerService { throw new InvalidCartridgeDefinitionException(msg); } - for (IaasProvider iaasProvider : iaases) { - CloudControllerUtil.getIaas(iaasProvider); - } - } - + if (iaases == null || iaases.isEmpty()) { + String msg = + "Invalid Cartridge Definition: Cartridge Type: " + + cartridgeConfig.getType()+ + ". Cause: Iaases of this Cartridge is null or empty."; + LOG.error(msg); + throw new InvalidCartridgeDefinitionException(msg); + } + + for (IaasProvider iaasProvider : iaases) { + CloudControllerUtil.getIaas(iaasProvider); + } + // TODO transaction begins String cartridgeType = cartridge.getType(); if(dataHolder.getCartridge(cartridgeType) != null) { @@ -184,8 +199,10 @@ public class CloudControllerServiceImpl implements CloudControllerService { TopologyBuilder.handleServiceCreated(cartridgeList); // transaction ends - - log.info("Successfully deployed the Cartridge definition: " + cartridgeType); + + LOG.info("Successfully deployed the Cartridge definition: " + cartridgeType); + + } } private void populateNewCartridge(Cartridge cartridge, @@ -201,8 +218,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { String partitionId = entry.getKey(); IaasProvider oldIaasProvider = entry.getValue(); if (newIaasProviders.contains(oldIaasProvider)) { - if (log.isDebugEnabled()) { - log.debug("Copying a partition from the Cartridge that is undeployed, to the new Cartridge. " + if (LOG.isDebugEnabled()) { + LOG.debug("Copying a partition from the Cartridge that is undeployed, to the new Cartridge. " + "[partition id] : "+partitionId+" [cartridge type] "+cartridge.getType() ); } cartridge.addIaasProvider(partitionId, newIaasProviders.get(newIaasProviders.indexOf(oldIaasProvider))); @@ -219,8 +236,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { // invalidate partition validation cache dataHolder.removeFromCartridgeTypeToPartitionIds(cartridgeType); - if (log.isDebugEnabled()) { - log.debug("Partition cache invalidated for cartridge "+cartridgeType); + if (LOG.isDebugEnabled()) { + LOG.debug("Partition cache invalidated for cartridge "+cartridgeType); } persist(); @@ -230,150 +247,150 @@ public class CloudControllerServiceImpl implements CloudControllerService { cartridgeList.add(cartridge); TopologyBuilder.handleServiceRemoved(cartridgeList); - if(log.isInfoEnabled()) { - log.info("Successfully undeployed the Cartridge definition: " + cartridgeType); + if(LOG.isInfoEnabled()) { + LOG.info("Successfully undeployed the Cartridge definition: " + cartridgeType); } return; } } String msg = "Cartridge [type] "+cartridgeType+" is not a deployed Cartridge type."; - log.error(msg); + LOG.error(msg); throw new InvalidCartridgeTypeException(msg); } - + public void deployServiceGroup(ServiceGroup servicegroup) throws InvalidServiceGroupException { - - if (servicegroup == null) { + + if (servicegroup == null) { String msg = "Invalid ServiceGroup Definition: Definition is null."; - log.error(msg); + LOG.error(msg); throw new IllegalArgumentException(msg); } - - if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:deployServiceGroup:" + servicegroup.getName()); + + if(LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:" + servicegroup.getName()); } + + String [] subGroups = servicegroup.getCartridges(); + - String [] subGroups = servicegroup.getCartridges(); - - - if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups" + subGroups); + if(LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups" + subGroups); if (subGroups != null) { - log.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups:size" + subGroups.length); + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups:size" + subGroups.length); } else { - log.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups: is null"); + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:subGroups: is null"); } } - - - Dependencies dependencies = servicegroup.getDependencies(); - - if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:deployServiceGroup:dependencies" + dependencies); - } - - if (dependencies != null) { - String [] startupOrders = dependencies.getStartupOrders(); - - if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrders" + startupOrders); - - if (startupOrders != null) { - log.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder:size" + startupOrders.length); - } else { - log.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder: is null"); - } - } + + + Dependencies dependencies = servicegroup.getDependencies(); + + if(LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:dependencies" + dependencies); } - - dataHolder.addServiceGroup(servicegroup); - - this.persist(); - + + if (dependencies != null) { + String [] startupOrders = dependencies.getStartupOrders(); + + if(LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrders" + startupOrders); + + if (startupOrders != null) { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder:size" + startupOrders.length); + } else { + LOG.debug("CloudControllerServiceImpl:deployServiceGroup:startupOrder: is null"); + } + } + } + + dataHolder.addServiceGroup(servicegroup); + + this.persist(); + } - + public void undeployServiceGroup(String name) throws InvalidServiceGroupException { - if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:undeployServiceGroup: " + name); + if(LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:undeployServiceGroup: " + name); } - + ServiceGroup serviceGroup = null; - + serviceGroup = dataHolder.getServiceGroup(name); - + if (serviceGroup != null) { if (dataHolder.getServiceGroups().remove(serviceGroup)) { persist(); - if(log.isInfoEnabled()) { - log.info("Successfully undeployed the Service Group definition: " + serviceGroup); + if(LOG.isInfoEnabled()) { + LOG.info("Successfully undeployed the Service Group definition: " + serviceGroup); } return; } - } - + } + String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; - log.error(msg); + LOG.error(msg); throw new InvalidServiceGroupException(msg); - + } - + @Override public ServiceGroup getServiceGroup (String name) throws InvalidServiceGroupException { - - if(log.isDebugEnabled()) { - log.debug("getServiceGroupDefinition:" + name); + + if(LOG.isDebugEnabled()) { + LOG.debug("getServiceGroupDefinition:" + name); } - - ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name); - - if (serviceGroup == null) { - if(log.isDebugEnabled()) { - log.debug("getServiceGroupDefinition: no entry found for service group " + name); + + ServiceGroup serviceGroup = this.dataHolder.getServiceGroup(name); + + if (serviceGroup == null) { + if(LOG.isDebugEnabled()) { + LOG.debug("getServiceGroupDefinition: no entry found for service group " + name); } - String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; - throw new InvalidServiceGroupException(msg); - } - - return serviceGroup; + String msg = "ServiceGroup " + name + " is not a deployed Service Group definition"; + throw new InvalidServiceGroupException(msg); + } + + return serviceGroup; } - + public String [] getServiceGroupSubGroups (String name) throws InvalidServiceGroupException { - ServiceGroup serviceGroup = this.getServiceGroup(name); - if (serviceGroup == null) { - throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); - } - - return serviceGroup.getSubGroups(); + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + + return serviceGroup.getSubGroups(); } - + /** - * + * */ public String [] getServiceGroupCartridges (String name) throws InvalidServiceGroupException { - ServiceGroup serviceGroup = this.getServiceGroup(name); - if (serviceGroup == null) { - throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); - } - String [] cs = serviceGroup.getCartridges(); - return cs; - + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + String [] cs = serviceGroup.getCartridges(); + return cs; + } - + public Dependencies getServiceGroupDependencies (String name) throws InvalidServiceGroupException { - ServiceGroup serviceGroup = this.getServiceGroup(name); - if (serviceGroup == null) { - throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); - } - return serviceGroup.getDependencies(); + ServiceGroup serviceGroup = this.getServiceGroup(name); + if (serviceGroup == null) { + throw new InvalidServiceGroupException("Invalid ServiceGroup " + serviceGroup); + } + return serviceGroup.getDependencies(); } - + @Override public MemberContext startInstance(MemberContext memberContext) throws UnregisteredCartridgeException, InvalidIaasProviderException { - if(log.isDebugEnabled()) { - log.debug("CloudControllerServiceImpl:startInstance"); + if(LOG.isDebugEnabled()) { + LOG.debug("CloudControllerServiceImpl:startInstance"); } handleNullObject(memberContext, "Instance start-up failed. Member is null."); @@ -385,7 +402,6 @@ public class CloudControllerServiceImpl implements CloudControllerService { LOG.debug("Received an instance spawn request : " + memberContext); } - ComputeService computeService = null; Template template = null; handleNullObject(partition, "Instance start-up failed. Specified Partition is null. " + @@ -413,8 +429,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { IaasProvider iaasProvider = cartridge.getIaasProviderOfPartition(partitionId); if (iaasProvider == null) { - if (log.isDebugEnabled()) { - log.debug("IaasToPartitionMap "+cartridge.hashCode() + if (LOG.isDebugEnabled()) { + LOG.debug("IaasToPartitionMap "+cartridge.hashCode() + " for cartridge "+cartridgeType+ " and for partition: "+partitionId); } String msg = "Instance start-up failed. " @@ -427,7 +443,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { + cartridge.getPartitionToIaasProvider().keySet() .toString() + ". " + memberContext.toString() + ". "; - log.fatal(msg); + LOG.fatal(msg); throw new InvalidIaasProviderException(msg); } String type = iaasProvider.getType(); @@ -513,7 +529,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } catch (Exception e) { String msg = "Failed to start an instance. " + memberContext.toString()+" Cause: "+e.getMessage(); - log.error(msg, e); + LOG.error(msg, e); throw new IllegalStateException(msg, e); } @@ -527,8 +543,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { String snapshotId = volume.getSnapshotId(); if(StringUtils.isNotEmpty(volume.getVolumeId())){ // volumeID is specified, so not creating additional volumes - if(log.isDebugEnabled()){ - log.debug("Volume creation is skipping since a volume ID is specified. [Volume ID]" + volume.getVolumeId()); + if(LOG.isDebugEnabled()){ + LOG.debug("Volume creation is skipping since a volume ID is specified. [Volume ID]" + volume.getVolumeId()); } volume.setId(volume.getVolumeId()); }else{ @@ -544,8 +560,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { StringBuilder persistencePayload = new StringBuilder(); if(isPersistenceMappingAvailable(ctx)){ for(Volume volume : ctx.getVolumes()){ - if(log.isDebugEnabled()){ - log.debug("Adding persistence mapping " + volume.toString()); + if(LOG.isDebugEnabled()){ + LOG.debug("Adding persistence mapping " + volume.toString()); } if(persistencePayload.length() != 0) { persistencePayload.append("|"); @@ -558,8 +574,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { persistencePayload.append(volume.getMappingPath()); } } - if(log.isDebugEnabled()){ - log.debug("Persistence payload is" + persistencePayload.toString()); + if(LOG.isDebugEnabled()){ + LOG.debug("Persistence payload is" + persistencePayload.toString()); } return persistencePayload; } @@ -583,7 +599,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } catch (RegistryException e) { String msg = "Failed to persist the Cloud Controller data in registry. Further, transaction roll back also failed."; - log.fatal(msg); + LOG.fatal(msg); throw new CloudControllerException(msg, e); } } @@ -603,7 +619,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { if(ctxt == null) { String msg = "Termination failed. Invalid Member Id: "+memberId; - log.error(msg); + LOG.error(msg); throw new InvalidMemberException(msg); } @@ -633,7 +649,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // these will never be null, since we do not add null values for these. Cartridge cartridge = dataHolder.getCartridge(cartridgeType); - log.info("Starting to terminate an instance with member id : " + memberId + + LOG.info("Starting to terminate an instance with member id : " + memberId + " in partition id: " + partitionId + " of cluster id: " + clusterId + " and of cartridge type: " + cartridgeType); @@ -642,7 +658,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { "Termination of Member Id: " + memberId + " failed. " + "Cannot find a matching Cartridge for type: " + cartridgeType; - log.error(msg); + LOG.error(msg); throw new InvalidCartridgeTypeException(msg); } @@ -655,7 +671,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { // log information logTermination(ctxt); - log.error(msg); + LOG.error(msg); throw new InvalidMemberException(msg); } @@ -670,26 +686,24 @@ public class CloudControllerServiceImpl implements CloudControllerService { } catch (Exception e) { String msg = "Instance termination failed. "+ctxt.toString(); - log.error(msg, e); + LOG.error(msg, e); throw new CloudControllerException(msg, e); } } } - private class IpAllocator implements Runnable { + private class JcloudsInstanceCreator implements Runnable { private MemberContext memberContext; private IaasProvider iaasProvider; private String cartridgeType; - NodeMetadata node; - public IpAllocator(MemberContext memberContext, IaasProvider iaasProvider, - String cartridgeType, NodeMetadata node) { + public JcloudsInstanceCreator(MemberContext memberContext, IaasProvider iaasProvider, + String cartridgeType) { this.memberContext = memberContext; this.iaasProvider = iaasProvider; this.cartridgeType = cartridgeType; - this.node = node; } @Override @@ -897,7 +911,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } catch (Exception e) { String msg = "Error occurred while allocating an ip address. " + memberContext.toString(); - log.error(msg, e); + LOG.error(msg, e); throw new CloudControllerException(msg, e); } @@ -952,7 +966,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { String msg = "Instance termination failed. " +ctxt.toString() + ". Cause: Unable to build Iaas of this " + iaasProvider.toString(); - log.error(msg, e); + LOG.error(msg, e); throw new CloudControllerException(msg, e); } @@ -969,7 +983,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { iaas.releaseAddress(ctxt.getAllocatedIpAddress()); } - log.info("Member is terminated: "+ctxt.toString()); + LOG.info("Member is terminated: "+ctxt.toString()); return iaasProvider; } @@ -986,8 +1000,8 @@ public class CloudControllerServiceImpl implements CloudControllerService { Iaas iaas = iaasProvider.getIaas(); iaas.detachVolume(ctxt.getInstanceId(), volumeId); } catch (ResourceNotFoundException ignore) { - if(log.isDebugEnabled()) { - log.debug(ignore); + if(LOG.isDebugEnabled()) { + LOG.debug(ignore); } } } @@ -1166,7 +1180,7 @@ public class CloudControllerServiceImpl implements CloudControllerService { } else { - TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_)); +// TopologyBuilder.handleClusterMaintenanceMode(dataHolder.getClusterContext(clusterId_)); Runnable terminateInTimeout = new Runnable() { @Override @@ -1418,6 +1432,24 @@ public class CloudControllerServiceImpl implements CloudControllerService { return dataHolder.getClusterContext(clusterId); } + public void deployApplicationDefinition (ApplicationContext applicationContext) throws ApplicationDefinitionException { + + ApplicationParser applicationParser = new DefaultApplicationParser(); + Application application = applicationParser.parse(applicationContext); + + // Create a Cluster Context obj. for each of the Clusters in the Application + for (ApplicationClusterContext applicationClusterContext : applicationParser.getApplicationClusterContexts()) { + dataHolder.addClusterContext(new ClusterContext(applicationClusterContext.getClusterId(), + applicationClusterContext.getCartridgeType(), applicationClusterContext.getTextPayload(), + applicationClusterContext.getHostName(), applicationClusterContext.isLbCluster(), null)); + } + + TopologyBuilder.handleApplicationDeployed(application, applicationParser.getApplicationClusterContexts(), + applicationParser.getPayloadData()); + + persist(); + } + @Override public MemberContext[] startContainers(ContainerClusterContext containerClusterContext) throws UnregisteredCartridgeException { @@ -1834,6 +1866,12 @@ public class CloudControllerServiceImpl implements CloudControllerService { } @Override + public void unDeployApplicationDefinition(String applicationId, int tenantId, String tenantDomain) throws ApplicationDefinitionException { + + TopologyBuilder.handleApplicationUndeployed(applicationId); + } + + @Override public MemberContext terminateContainer(String memberId) throws MemberTerminationFailedException { handleNullObject(memberId, "Failed to terminate member. Invalid Member id. [Member id] " + memberId); @@ -1887,29 +1925,5 @@ public class CloudControllerServiceImpl implements CloudControllerService { } } - public void deployApplicationDefinition (ApplicationContext applicationContext) throws ApplicationDefinitionException { - - ApplicationParser applicationParser = new DefaultApplicationParser(); - Application application = applicationParser.parse(applicationContext); - - // Create a Cluster Context obj. for each of the Clusters in the Application - for (ApplicationClusterContext applicationClusterContext : applicationParser.getApplicationClusterContexts()) { - dataHolder.addClusterContext(new ClusterContext(applicationClusterContext.getClusterId(), - applicationClusterContext.getCartridgeType(), applicationClusterContext.getTextPayload(), - applicationClusterContext.getHostName(), applicationClusterContext.isLbCluster())); - } - - TopologyBuilder.handleApplicationDeployed(application, applicationParser.getApplicationClusterContexts(), - applicationParser.getPayloadData()); - - persist(); - } - - @Override - public void unDeployApplicationDefinition(String applicationId, int tenantId, String tenantDomain) throws ApplicationDefinitionException { - - TopologyBuilder.handleApplicationUndeployed(applicationId); - } - } http://git-wip-us.apache.org/repos/asf/stratos/blob/08de40fd/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java index 88ff2b6..797f299 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerDSComponent.java @@ -32,6 +32,7 @@ import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusE import org.apache.stratos.cloud.controller.topic.instance.status.InstanceStatusEventMessageListener; import org.apache.stratos.cloud.controller.util.CloudControllerConstants; import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder; +import org.apache.stratos.messaging.broker.publish.EventPublisherPool; import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber; import org.apache.stratos.messaging.message.receiver.application.status.ApplicationStatusEventReceiver; import org.apache.stratos.messaging.util.Constants; http://git-wip-us.apache.org/repos/asf/stratos/blob/08de40fd/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java index bf74095..5509266 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java @@ -57,8 +57,8 @@ public class TopologyBuilder { Service service; Topology topology = TopologyManager.getTopology(); if (cartridgeList == null) { - log.warn(String.format("Cartridge list is empty")); - return; + log.warn(String.format("Cartridge list is empty")); + return; } try { @@ -95,7 +95,7 @@ public class TopologyBuilder { public static void handleServiceRemoved(List<Cartridge> cartridgeList) { Topology topology = TopologyManager.getTopology(); - for (Cartridge cartridge : cartridgeList) { + for (Cartridge cartridge : cartridgeList) { if (topology.getService(cartridge.getType()).getClusters().size() == 0) { if (topology.serviceExists(cartridge.getType())) { try { @@ -107,7 +107,7 @@ public class TopologyBuilder { } TopologyEventPublisher.sendServiceRemovedEvent(cartridgeList); } else { - log.warn(String.format("Service %s does not exist..", cartridge.getType())); + log.warn(String.format("Service %s does not exist..", cartridge.getType())); } } else { log.warn("Subscription already exists. Hence not removing the service:" + cartridge.getType() @@ -116,8 +116,10 @@ public class TopologyBuilder { } } + public static void handleClusterCreated(Registrant registrant, boolean isLb) { - /*Topology topology = TopologyManager.getTopology(); + /** + Topology topology = TopologyManager.getTopology(); Service service; try { TopologyManager.acquireWriteLock(); @@ -129,17 +131,17 @@ public class TopologyBuilder { } Properties props = CloudControllerUtil.toJavaUtilProperties(registrant.getProperties()); - + Cluster cluster; String clusterId = registrant.getClusterId(); if (service.clusterExists(clusterId)) { // update the cluster cluster = service.getCluster(clusterId); cluster.addHostName(registrant.getHostName()); - if (service.getServiceType() == ServiceType.MultiTenant) { + if(service.getServiceType() == ServiceType.MultiTenant) { cluster.setTenantRange(registrant.getTenantRange()); } - if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { + if(service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY)); } cluster.setProperties(props); @@ -147,17 +149,16 @@ public class TopologyBuilder { setKubernetesCluster(cluster); } else { cluster = new Cluster(cartridgeType, clusterId, - registrant.getDeploymentPolicyName(), registrant.getAutoScalerPolicyName(), null); + registrant.getDeploymentPolicyName(), registrant.getAutoScalerPolicyName()); cluster.addHostName(registrant.getHostName()); - if (service.getServiceType() == ServiceType.MultiTenant) { + if(service.getServiceType() == ServiceType.MultiTenant) { cluster.setTenantRange(registrant.getTenantRange()); } - if (service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { + if(service.getProperties().getProperty(Constants.IS_PRIMARY) != null) { props.setProperty(Constants.IS_PRIMARY, service.getProperties().getProperty(Constants.IS_PRIMARY)); } cluster.setProperties(props); cluster.setLbCluster(isLb); - //cluster.setStatus(Status.Created); setKubernetesCluster(cluster); cluster.setStatus(ClusterStatus.Created); service.addCluster(cluster); @@ -167,9 +168,11 @@ public class TopologyBuilder { } finally { TopologyManager.releaseWriteLock(); - }*/ + } + **/ } + private static void setKubernetesCluster(Cluster cluster) { boolean isKubernetesCluster = (cluster.getProperties().getProperty(StratosConstants.KUBERNETES_CLUSTER_ID) != null); if (log.isDebugEnabled()) { @@ -183,16 +186,16 @@ public class TopologyBuilder { Service service = topology.getService(ctxt.getCartridgeType()); String deploymentPolicy; if (service == null) { - log.warn(String.format("Service %s does not exist", + log.warn(String.format("Service %s does not exist", ctxt.getCartridgeType())); - return; + return; } if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", + log.warn(String.format("Cluster %s does not exist for service %s", ctxt.getClusterId(), ctxt.getCartridgeType())); - return; + return; } try { @@ -206,96 +209,94 @@ public class TopologyBuilder { TopologyEventPublisher.sendClusterRemovedEvent(ctxt, deploymentPolicy); } - /*public static void handleClusterMaintenanceMode(ClusterContext ctxt) { - - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(ctxt.getCartridgeType()); - if (service == null) { - log.warn(String.format("Service %s does not exist", - ctxt.getCartridgeType())); - return; - } + public static void handleMemberSpawned(String serviceName, + String clusterId, String partitionId, + String privateIp, String publicIp, MemberContext context) { + // adding the new member to the cluster after it is successfully started + // in IaaS. + Topology topology = TopologyManager.getTopology(); + Service service = topology.getService(serviceName); + Cluster cluster = service.getCluster(clusterId); + String memberId = context.getMemberId(); + String networkPartitionId = context.getNetworkPartitionId(); + String lbClusterId = context.getLbClusterId(); + long initTime = context.getInitTime(); + + if (cluster.memberExists(memberId)) { + log.warn(String.format("Member %s already exists", memberId)); + return; + } - if (!service.clusterExists(ctxt.getClusterId())) { - log.warn(String.format("Cluster %s does not exist for service %s", - ctxt.getClusterId(), - ctxt.getCartridgeType())); - return; - } + try { + TopologyManager.acquireWriteLock(); + Member member = new Member(serviceName, clusterId, + networkPartitionId, partitionId, memberId, initTime); + member.setStatus(MemberStatus.Created); + member.setMemberIp(privateIp); + member.setLbClusterId(lbClusterId); + member.setMemberPublicIp(publicIp); + member.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties())); + try { + // Update port mappings with generated service proxy port + // TODO: Need to properly fix with the latest Kubernetes version + String serviceHostPortStr = CloudControllerUtil.getProperty(context.getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT); + if(StringUtils.isEmpty(serviceHostPortStr)) { + log.warn("Kubernetes service host port not found for member: [member-id] " + memberId); + } - try { - TopologyManager.acquireWriteLock(); - Cluster cluster = service.getCluster(ctxt.getClusterId()); - if (!cluster.isStateTransitionValid(ClusterStatus.Inactive)) { - log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Inactive); + Cartridge cartridge = FasterLookUpDataHolder.getInstance(). + getCartridge(serviceName); + List<PortMapping> portMappings = cartridge.getPortMappings(); + Port port; + // Adding ports to the member + for (PortMapping portMapping : portMappings) { + if (cluster.isKubernetesCluster() && (StringUtils.isNotEmpty(serviceHostPortStr))) { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(serviceHostPortStr), + Integer.parseInt(portMapping.getProxyPort())); + member.addPort(port); + } else { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(portMapping.getPort()), + Integer.parseInt(portMapping.getProxyPort())); + member.addPort(port); + } + } + } catch (Exception e) { + log.error("Could not update member port-map: [member-id] " + memberId, e); } - cluster.setStatus(ClusterStatus.Inactive); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - TopologyEventPublisher.sendClusterMaintenanceModeEvent(ctxt); - }*/ - - - public static void handleMemberSpawned(String serviceName, - String clusterId, String partitionId, - String privateIp, String publicIp, MemberContext context) { - // adding the new member to the cluster after it is successfully started - // in IaaS. - Topology topology = TopologyManager.getTopology(); - Service service = topology.getService(serviceName); - Cluster cluster = service.getCluster(clusterId); - String memberId = context.getMemberId(); - String networkPartitionId = context.getNetworkPartitionId(); - String lbClusterId = context.getLbClusterId(); - - if (cluster.memberExists(memberId)) { - log.warn(String.format("Member %s already exists", memberId)); - return; - } - - try { - TopologyManager.acquireWriteLock(); - Member member = new Member(serviceName, clusterId, - networkPartitionId, partitionId, memberId); - //member.setStatus(MemberStatus.Created); - member.setMemberIp(privateIp); - member.setLbClusterId(lbClusterId); - member.setMemberPublicIp(publicIp); - member.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties())); - cluster.addMember(member); - TopologyManager.updateTopology(topology); - } finally { - TopologyManager.releaseWriteLock(); - } - - TopologyEventPublisher.sendInstanceSpawnedEvent(serviceName, clusterId, - networkPartitionId, partitionId, memberId, lbClusterId, - publicIp, privateIp, context); - } - + cluster.addMember(member); + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + + TopologyEventPublisher.sendInstanceSpawnedEvent(serviceName, clusterId, + networkPartitionId, partitionId, memberId, lbClusterId, + publicIp, privateIp, context); + } + public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceStartedEvent.getServiceName()); if (service == null) { - log.warn(String.format("Service %s does not exist", + log.warn(String.format("Service %s does not exist", instanceStartedEvent.getServiceName())); - return; + return; } if (!service.clusterExists(instanceStartedEvent.getClusterId())) { - log.warn(String.format("Cluster %s does not exist in service %s", + log.warn(String.format("Cluster %s does not exist in service %s", instanceStartedEvent.getClusterId(), instanceStartedEvent.getServiceName())); - return; + return; } Member member = service.getCluster(instanceStartedEvent.getClusterId()). getMember(instanceStartedEvent.getMemberId()); if (member == null) { - log.warn(String.format("Member %s does not exist", + log.warn(String.format("Member %s does not exist", instanceStartedEvent.getMemberId())); - return; + return; } try { @@ -315,12 +316,12 @@ public class TopologyBuilder { TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); //publishing data CartridgeInstanceDataPublisher.publish(instanceStartedEvent.getMemberId(), - instanceStartedEvent.getPartitionId(), - instanceStartedEvent.getNetworkPartitionId(), - instanceStartedEvent.getClusterId(), - instanceStartedEvent.getServiceName(), - MemberStatus.Starting.toString(), - null); + instanceStartedEvent.getPartitionId(), + instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterId(), + instanceStartedEvent.getServiceName(), + MemberStatus.Starting.toString(), + null); } public static void handleMemberActivated(InstanceActivatedEvent instanceActivatedEvent) { @@ -328,28 +329,27 @@ public class TopologyBuilder { Service service = topology.getService(instanceActivatedEvent.getServiceName()); if (service == null) { log.warn(String.format("Service %s does not exist", - instanceActivatedEvent.getServiceName())); + instanceActivatedEvent.getServiceName())); return; } - + Cluster cluster = service.getCluster(instanceActivatedEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - instanceActivatedEvent.getClusterId())); + instanceActivatedEvent.getClusterId())); return; } - Member member = cluster.getMember(instanceActivatedEvent.getMemberId()); if (member == null) { - log.warn(String.format("Member %s does not exist", + log.warn(String.format("Member %s does not exist", instanceActivatedEvent.getMemberId())); - return; + return; } MemberActivatedEvent memberActivatedEvent = new MemberActivatedEvent(instanceActivatedEvent.getServiceName(), - instanceActivatedEvent.getClusterId(), instanceActivatedEvent.getNetworkPartitionId(), instanceActivatedEvent.getPartitionId(), instanceActivatedEvent.getMemberId()); + instanceActivatedEvent.getClusterId(), instanceActivatedEvent.getNetworkPartitionId(), instanceActivatedEvent.getPartitionId(), instanceActivatedEvent.getMemberId()); // grouping - set grouid memberActivatedEvent.setGroupId(instanceActivatedEvent.getGroupId()); @@ -361,9 +361,19 @@ public class TopologyBuilder { } member.setStatus(MemberStatus.Activated); log.info("member started event adding status activated"); - // Adding ports to the event - // TODO: Need to remove this since ports are now set in member spawned event - memberActivatedEvent.addPorts(member.getPorts()); + Cartridge cartridge = FasterLookUpDataHolder.getInstance(). + getCartridge(instanceActivatedEvent.getServiceName()); + + List<PortMapping> portMappings = cartridge.getPortMappings(); + Port port; + //adding ports to the event + for (PortMapping portMapping : portMappings) { + port = new Port(portMapping.getProtocol(), + Integer.parseInt(portMapping.getPort()), + Integer.parseInt(portMapping.getProxyPort())); + member.addPort(port); + memberActivatedEvent.addPort(port); + } memberActivatedEvent.setMemberIp(member.getMemberIp()); memberActivatedEvent.setMemberPublicIp(member.getMemberPublicIp()); @@ -375,29 +385,29 @@ public class TopologyBuilder { TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); //publishing data CartridgeInstanceDataPublisher.publish(memberActivatedEvent.getMemberId(), - memberActivatedEvent.getPartitionId(), - memberActivatedEvent.getNetworkPartitionId(), - memberActivatedEvent.getClusterId(), - memberActivatedEvent.getServiceName(), - MemberStatus.Activated.toString(), - null); + memberActivatedEvent.getPartitionId(), + memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterId(), + memberActivatedEvent.getServiceName(), + MemberStatus.Activated.toString(), + null); } public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent instanceReadyToShutdownEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { + throws InvalidMemberException, InvalidCartridgeTypeException { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceReadyToShutdownEvent.getServiceName()); //update the status of the member if (service == null) { - log.warn(String.format("Service %s does not exist", - instanceReadyToShutdownEvent.getServiceName())); - return; + log.warn(String.format("Service %s does not exist", + instanceReadyToShutdownEvent.getServiceName())); + return; } Cluster cluster = service.getCluster(instanceReadyToShutdownEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - instanceReadyToShutdownEvent.getClusterId())); + instanceReadyToShutdownEvent.getClusterId())); return; } @@ -409,11 +419,11 @@ public class TopologyBuilder { return; } MemberReadyToShutdownEvent memberReadyToShutdownEvent = new MemberReadyToShutdownEvent( - instanceReadyToShutdownEvent.getServiceName(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getMemberId()); + instanceReadyToShutdownEvent.getServiceName(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getMemberId()); try { TopologyManager.acquireWriteLock(); @@ -430,30 +440,30 @@ public class TopologyBuilder { TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); //publishing data CartridgeInstanceDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), - instanceReadyToShutdownEvent.getPartitionId(), - instanceReadyToShutdownEvent.getNetworkPartitionId(), - instanceReadyToShutdownEvent.getClusterId(), - instanceReadyToShutdownEvent.getServiceName(), - MemberStatus.ReadyToShutDown.toString(), - null); + instanceReadyToShutdownEvent.getPartitionId(), + instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterId(), + instanceReadyToShutdownEvent.getServiceName(), + MemberStatus.ReadyToShutDown.toString(), + null); //termination of particular instance will be handled by autoscaler } - public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) - throws InvalidMemberException, InvalidCartridgeTypeException { + public static void handleMemberMaintenance(InstanceMaintenanceModeEvent instanceMaintenanceModeEvent) + throws InvalidMemberException, InvalidCartridgeTypeException { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(instanceMaintenanceModeEvent.getServiceName()); //update the status of the member if (service == null) { log.warn(String.format("Service %s does not exist", - instanceMaintenanceModeEvent.getServiceName())); + instanceMaintenanceModeEvent.getServiceName())); return; } Cluster cluster = service.getCluster(instanceMaintenanceModeEvent.getClusterId()); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - instanceMaintenanceModeEvent.getClusterId())); + instanceMaintenanceModeEvent.getClusterId())); return; } @@ -466,11 +476,11 @@ public class TopologyBuilder { MemberMaintenanceModeEvent memberMaintenanceModeEvent = new MemberMaintenanceModeEvent( - instanceMaintenanceModeEvent.getServiceName(), - instanceMaintenanceModeEvent.getClusterId(), - instanceMaintenanceModeEvent.getNetworkPartitionId(), - instanceMaintenanceModeEvent.getPartitionId(), - instanceMaintenanceModeEvent.getMemberId()); + instanceMaintenanceModeEvent.getServiceName(), + instanceMaintenanceModeEvent.getClusterId(), + instanceMaintenanceModeEvent.getNetworkPartitionId(), + instanceMaintenanceModeEvent.getPartitionId(), + instanceMaintenanceModeEvent.getMemberId()); try { TopologyManager.acquireWriteLock(); // try update lifecycle state @@ -495,23 +505,23 @@ public class TopologyBuilder { Properties properties; if (service == null) { log.warn(String.format("Service %s does not exist", - serviceName)); + serviceName)); return; } Cluster cluster = service.getCluster(clusterId); if (cluster == null) { log.warn(String.format("Cluster %s does not exist", - clusterId)); + clusterId)); return; } - + Member member = cluster.getMember(memberId); - if (member == null) { - log.warn(String.format("Member with nodeID %s does not exist", - memberId)); - return; - } + if (member == null) { + log.warn(String.format("Member with member id %s does not exist", + memberId)); + return; + } try { TopologyManager.acquireWriteLock(); http://git-wip-us.apache.org/repos/asf/stratos/blob/08de40fd/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java index 7ac8334..b561afe 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java @@ -1,4 +1,3 @@ -package org.apache.stratos.cloud.controller.topology; /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -17,8 +16,8 @@ package org.apache.stratos.cloud.controller.topology; * specific language governing permissions and limitations * under the License. */ -import java.util.List; -import java.util.Properties; + +package org.apache.stratos.cloud.controller.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,25 +28,10 @@ import org.apache.stratos.cloud.controller.pojo.PortMapping; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import org.apache.stratos.messaging.domain.topology.Port; -import org.apache.stratos.messaging.domain.topology.ServiceType; -import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; -import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent; -import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; -import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; -import org.apache.stratos.messaging.event.topology.InstanceSpawnedEvent; -import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; -import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; -import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; -import org.apache.stratos.messaging.event.topology.MemberStartedEvent; -import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; -import org.apache.stratos.messaging.event.topology.ServiceCreatedEvent; -import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent; +import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.util.Util; import java.util.List; @@ -58,8 +42,7 @@ import java.util.Set; * this is to send the relevant events from cloud controller to topology topic */ public class TopologyEventPublisher { - private static final Log log = LogFactory - .getLog(TopologyEventPublisher.class); + private static final Log log = LogFactory.getLog(TopologyEventPublisher.class); public static void sendServiceCreateEvent(List<Cartridge> cartridgeList) { ServiceCreatedEvent serviceCreatedEvent; @@ -100,13 +83,11 @@ public class TopologyEventPublisher { } } - public static void sendClusterCreatedEvent(String serviceName, - String clusterId, Cluster cluster) { - ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent( - serviceName, clusterId, cluster); + public static void sendClusterCreatedEvent(String appId, String serviceName, String clusterId) { + ClusterCreatedEvent clusterCreatedEvent = new ClusterCreatedEvent(appId, serviceName, clusterId); if (log.isInfoEnabled()) { - log.info("Publishing cluster created event: " + cluster.toString()); + log.info("Publishing cluster created event: " + clusterId); } publishEvent(clusterCreatedEvent); } @@ -140,7 +121,8 @@ public class TopologyEventPublisher { // } public static void sendClusterRemovedEvent(ClusterContext ctxt, String deploymentPolicy) { - + ClusterRemovedEvent clusterRemovedEvent = new ClusterRemovedEvent( + ctxt.getCartridgeType(), ctxt.getClusterId(), deploymentPolicy, ctxt.isLbCluster()); if (log.isInfoEnabled()) { log.info(String .format("Publishing cluster removed event: [service] %s [cluster] %s", @@ -150,21 +132,6 @@ public class TopologyEventPublisher { } - public static void sendClusterMaintenanceModeEvent(ClusterContext ctxt) { - - ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = new ClusterMaintenanceModeEvent( - ctxt.getCartridgeType(), ctxt.getClusterId()); - clusterMaintenanceModeEvent.setStatus(ClusterStatus.In_Maintenance); - if (log.isInfoEnabled()) { - log.info(String - .format("Publishing cluster maintenance mode event: [service] %s [cluster] %s", - clusterMaintenanceModeEvent.getServiceName(), - clusterMaintenanceModeEvent.getClusterId())); - } - publishEvent(clusterMaintenanceModeEvent); - - } - public static void sendInstanceSpawnedEvent(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId, String lbClusterId, String publicIp, @@ -188,20 +155,6 @@ public class TopologyEventPublisher { publishEvent(instanceSpawnedEvent); } - public static void sendInstanceSpawnedEvent(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId, - String lbClusterId, String publicIp, String privateIp, MemberContext context) { - InstanceSpawnedEvent instanceSpawnedEvent = new InstanceSpawnedEvent(serviceName, clusterId, networkPartitionId, partitionId, memberId); - instanceSpawnedEvent.setLbClusterId(lbClusterId); - instanceSpawnedEvent.setMemberIp(privateIp); - instanceSpawnedEvent.setMemberPublicIp(publicIp); - instanceSpawnedEvent.setProperties(CloudControllerUtil.toJavaUtilProperties(context.getProperties())); - if(log.isInfoEnabled()) { - log.info(String.format("Publishing instance spawned event: [service] %s [cluster] %s [network-partition] %s [partition] %s [member] %s [lb-cluster-id] %s", - serviceName, clusterId, networkPartitionId, partitionId, memberId, lbClusterId)); - } - publishEvent(instanceSpawnedEvent); - } - public static void sendMemberStartedEvent(InstanceStartedEvent instanceStartedEvent) { MemberStartedEvent memberStartedEventTopology = new MemberStartedEvent(instanceStartedEvent.getServiceName(), instanceStartedEvent.getClusterId(), instanceStartedEvent.getNetworkPartitionId(), instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId()); @@ -308,11 +261,6 @@ public class TopologyEventPublisher { publishEvent(applicationActivatedEvent); } - public static void publishEvent(Event event) { - EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TOPOLOGY_TOPIC); - eventPublisher.publish(event); - } - public static void sendApplicationInactivatedEvent(ApplicationInactivatedEvent applicationActivatedEvent1) { if(log.isInfoEnabled()) { log.info(String.format("Publishing application in activated event: [appId] %s",
