Repository: stratos Updated Branches: refs/heads/4.0.0-grouping 855039889 -> 773a01eaa
groupTerminating and groupTerminated events, listenrs and prcessors Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/773a01ea Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/773a01ea Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/773a01ea Branch: refs/heads/4.0.0-grouping Commit: 773a01eaa519e77fa01e3eebd1d616e905105953 Parents: 8550398 Author: Udara Liyanage <[email protected]> Authored: Wed Oct 22 17:44:43 2014 +0530 Committer: Udara Liyanage <[email protected]> Committed: Wed Oct 22 17:45:26 2014 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 49 ++++++++- .../ApplicationStatusTopicReceiver.java | 20 ++++ .../controller/topology/TopologyBuilder.java | 80 +++++++++++++- .../topology/TopologyEventPublisher.java | 21 +++- .../status/GroupInInactivateEvent.java | 44 ++++++++ .../status/GroupInTerminatedEvent.java | 44 ++++++++ .../status/GroupInTerminatingEvent.java | 44 ++++++++ .../event/topology/GroupInactivatedEvent.java | 43 ++++++++ .../event/topology/GroupTerminatedEvent.java | 43 ++++++++ .../event/topology/GroupTerminatingEvent.java | 43 ++++++++ .../status/GroupInTerminatedEventListener.java | 27 +++++ .../status/GroupInTerminatingEventListener.java | 27 +++++ .../topology/GroupTerminatedEventListener.java | 27 +++++ .../topology/GroupTerminatingEventListener.java | 27 +++++ ...onStatusGroupTerminatedMessageProcessor.java | 61 ++++++++++ ...nStatusGroupTerminatingMessageProcessor.java | 61 ++++++++++ .../ApplicationStatusMessageProcessorChain.java | 15 ++- .../topology/GroupTerminatedProcessor.java | 110 +++++++++++++++++++ .../topology/GroupTerminatingProcessor.java | 110 +++++++++++++++++++ .../topology/TopologyMessageProcessorChain.java | 22 ++-- 20 files changed, 900 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 2f31d1b..7ae7df0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -35,9 +35,14 @@ import org.apache.stratos.autoscaler.monitor.group.GroupMonitor; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.status.checker.StatusChecker; -import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.GroupStatus; import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.event.application.status.GroupInTerminatingEvent; import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.listener.application.status.GroupInTerminatingEventListener; import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; @@ -238,6 +243,48 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } }); + topologyEventReceiver.addEventListener(new GroupTerminatingEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[GroupTerminatedEven] Received: " + event.getClass()); + + GroupTerminatingEvent groupTerminatingEvent = (GroupTerminatingEvent) event; + String appId = groupTerminatingEvent.getAppId(); + String groupId = groupTerminatingEvent.getGroupId(); + + ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId); + GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId); + + //changing the status in the monitor, will notify its parent monitor + monitor.setStatus(GroupStatus.Terminating); + + //starting the status checker to decide on the status of it's parent + //StatusChecker.getInstance().onGroupStatusChange(groupId, appId); + } + }); + + topologyEventReceiver.addEventListener(new GroupTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + + log.info("[GroupTerminatedEven] Received: " + event.getClass()); + + GroupTerminatedEvent groupTerminatedEvent = (GroupTerminatedEvent) event; + String appId = groupTerminatedEvent.getAppId(); + String groupId = groupTerminatedEvent.getGroupId(); + + ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(appId); + GroupMonitor monitor = (GroupMonitor) appMonitor.findGroupMonitorWithId(groupId); + + //changing the status in the monitor, will notify its parent monitor + monitor.setStatus(GroupStatus.Terminated); + + //starting the status checker to decide on the status of it's parent + //StatusChecker.getInstance().onGroupStatusChange(groupId, appId); + } + }); + topologyEventReceiver.addEventListener(new ApplicationActivatedEventListener() { @Override protected void onEvent(Event event) { http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java index cfcea8a..bfd0167 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/application/status/receiver/ApplicationStatusTopicReceiver.java @@ -23,9 +23,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.cloud.controller.topology.TopologyBuilder; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.application.status.*; +import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent; +import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; import org.apache.stratos.messaging.listener.application.status.*; import org.apache.stratos.messaging.listener.topology.ClusterActivatedEventListener; import org.apache.stratos.messaging.listener.topology.GroupActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.GroupTerminatedEventListener; +import org.apache.stratos.messaging.listener.topology.GroupTerminatingEventListener; import org.apache.stratos.messaging.message.receiver.application.status.ApplicationStatusEventReceiver; public class ApplicationStatusTopicReceiver implements Runnable { @@ -82,6 +86,22 @@ public class ApplicationStatusTopicReceiver implements Runnable { } }); + statusEventReceiver.addEventListener(new GroupTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleGroupTerminatedEvent((GroupInTerminatedEvent) event); + + } + }); + + statusEventReceiver.addEventListener(new GroupTerminatingEventListener() { + @Override + protected void onEvent(Event event) { + TopologyBuilder.handleGroupTerminatingEvent((GroupInTerminatingEvent) event); + + } + }); + statusEventReceiver.addEventListener(new ApplicationActivatedEventListener() { @Override http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java index c3286c5..cee40f3 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyBuilder.java @@ -32,14 +32,18 @@ import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.domain.topology.util.CompositeApplicationBuilder; import org.apache.stratos.messaging.event.application.status.*; +import org.apache.stratos.messaging.event.application.status.ApplicationActivatedEvent; +import org.apache.stratos.messaging.event.application.status.ApplicationCreatedEvent; +import org.apache.stratos.messaging.event.application.status.ApplicationInactivatedEvent; +import org.apache.stratos.messaging.event.application.status.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.event.application.status.ApplicationTerminatingEvent; +import org.apache.stratos.messaging.event.application.status.ClusterActivatedEvent; +import org.apache.stratos.messaging.event.application.status.GroupActivatedEvent; import org.apache.stratos.messaging.event.instance.status.InstanceActivatedEvent; import org.apache.stratos.messaging.event.instance.status.InstanceMaintenanceModeEvent; import org.apache.stratos.messaging.event.instance.status.InstanceReadyToShutdownEvent; import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; -import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; -import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; -import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; -import org.apache.stratos.messaging.util.Constants; +import org.apache.stratos.messaging.event.topology.*; import org.wso2.carbon.registry.core.exceptions.RegistryException; import java.util.*; @@ -1022,4 +1026,72 @@ public class TopologyBuilder { TopologyManager.releaseWriteLock(); } } + + public static void handleGroupTerminatedEvent(GroupInTerminatedEvent event) { + Topology topology = TopologyManager.getTopology(); + Application application = topology.getApplication(event.getAppId()); + //update the status of the Group + if (application == null) { + log.warn(String.format("Application %s does not exist", + event.getAppId())); + return; + } + + Group group = application.getGroupRecursively(event.getGroupId()); + if (group == null) { + log.warn(String.format("Group %s does not exist", + event.getGroupId())); + return; + } + + org.apache.stratos.messaging.event.topology.GroupTerminatedEvent groupTerminatedTopologyEvent = + new org.apache.stratos.messaging.event.topology.GroupTerminatedEvent( + event.getAppId(), + event.getGroupId()); + try { + TopologyManager.acquireWriteLock(); + group.setStatus(GroupStatus.Terminated); + log.info("Group activated adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendGroupTerminatedEvent(groupTerminatedTopologyEvent); + } + + public static void handleGroupTerminatingEvent(GroupInTerminatingEvent event) { + Topology topology = TopologyManager.getTopology(); + Application application = topology.getApplication(event.getAppId()); + //update the status of the Group + if (application == null) { + log.warn(String.format("Application %s does not exist", + event.getAppId())); + return; + } + + Group group = application.getGroupRecursively(event.getGroupId()); + if (group == null) { + log.warn(String.format("Group %s does not exist", + event.getGroupId())); + return; + } + + org.apache.stratos.messaging.event.topology.GroupTerminatingEvent groupTerminatingTopologyEvent = + new org.apache.stratos.messaging.event.topology.GroupTerminatingEvent( + event.getAppId(), + event.getGroupId()); + try { + TopologyManager.acquireWriteLock(); + group.setStatus(GroupStatus.Terminating); + log.info("Group activated adding status started"); + + TopologyManager.updateTopology(topology); + } finally { + TopologyManager.releaseWriteLock(); + } + //publishing data + TopologyEventPublisher.sendGroupTerminatingEvent(groupTerminatingTopologyEvent); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java index 93523a4..f6b247b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/topology/TopologyEventPublisher.java @@ -26,11 +26,6 @@ import org.apache.stratos.cloud.controller.pojo.PortMapping; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.broker.publish.EventPublisher; import org.apache.stratos.messaging.broker.publish.EventPublisherPool; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.ConfigCompositeApplication; -import org.apache.stratos.messaging.domain.topology.Port; -import org.apache.stratos.messaging.domain.topology.ServiceType; -import org.apache.stratos.messaging.domain.topology.Topology; import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.Event; import org.apache.stratos.messaging.event.instance.status.InstanceStartedEvent; @@ -290,4 +285,20 @@ public class TopologyEventPublisher { } publishEvent(applicationTerminatedEvent); } + + public static void sendGroupTerminatedEvent(GroupTerminatedEvent groupTerminatedTopologyEvent) { + if(log.isInfoEnabled()) { + log.info(String.format("Publishing group terminated event: [appId] %s", + groupTerminatedTopologyEvent.getAppId())); + } + publishEvent(groupTerminatedTopologyEvent); + } + + public static void sendGroupTerminatingEvent(GroupTerminatingEvent groupTerminatingTopologyEvent) { + if(log.isInfoEnabled()) { + log.info(String.format("Publishing group terminating event: [appId] %s", + groupTerminatingTopologyEvent.getAppId())); + } + publishEvent(groupTerminatingTopologyEvent); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java new file mode 100644 index 0000000..6aeada7 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInInactivateEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class GroupInInactivateEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private String groupId; + private String appId; + + public GroupInInactivateEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getGroupId() { + return this.groupId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java new file mode 100644 index 0000000..16cc0c7 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatedEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class GroupInTerminatedEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private String groupId; + private String appId; + + public GroupInTerminatedEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getGroupId() { + return this.groupId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java new file mode 100644 index 0000000..6eaf5c3 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/GroupInTerminatingEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.messaging.event.application.status; + +/** + * This event is fired by cartridge agent when it has started the server and + * applications are ready to serve the incoming requests. + */ +public class GroupInTerminatingEvent extends StatusEvent { + private static final long serialVersionUID = 2625412714611885089L; + + private String groupId; + private String appId; + + public GroupInTerminatingEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getGroupId() { + return this.groupId; + } + + public String getAppId() { + return appId; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java new file mode 100644 index 0000000..176f709 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupInactivatedEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Group Activated Event which will be sent to Topology upon group activation + */ +public class GroupInactivatedEvent extends Event { + private String appId; + private String groupId; + + public GroupInactivatedEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getAppId() { + return appId; + } + + public String getGroupId() { + return groupId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java new file mode 100644 index 0000000..667343c --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatedEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Group Activated Event which will be sent to Topology upon group activation + */ +public class GroupTerminatedEvent extends Event { + private String appId; + private String groupId; + + public GroupTerminatedEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getAppId() { + return appId; + } + + public String getGroupId() { + return groupId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java new file mode 100644 index 0000000..31e0cc5 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupTerminatingEvent.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.event.topology; + +import org.apache.stratos.messaging.event.Event; + +/** + * Group Activated Event which will be sent to Topology upon group activation + */ +public class GroupTerminatingEvent extends Event { + private String appId; + private String groupId; + + public GroupTerminatingEvent(String appId, String groupId) { + this.appId = appId; + this.groupId = groupId; + } + + public String getAppId() { + return appId; + } + + public String getGroupId() { + return groupId; + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java new file mode 100644 index 0000000..8cb331c --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatedEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application.status; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Created by reka on 9/22/14. + */ +public abstract class GroupInTerminatedEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java new file mode 100644 index 0000000..20c8b68 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/GroupInTerminatingEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.application.status; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Created by reka on 9/22/14. + */ +public abstract class GroupInTerminatingEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java new file mode 100644 index 0000000..af409e7 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatedEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * This will get triggered by the groups activation processor after processing the event + */ +public abstract class GroupTerminatedEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java new file mode 100644 index 0000000..ab4042f --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupTerminatingEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.listener.topology; + +import org.apache.stratos.messaging.listener.EventListener; + +/** + * This will get triggered by the groups activation processor after processing the event + */ +public abstract class GroupTerminatingEventListener extends EventListener { +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java new file mode 100644 index 0000000..93dd750 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatedMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.application.status; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.application.status.GroupInTerminatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationStatusGroupTerminatedMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationStatusGroupTerminatedMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupInTerminatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupInTerminatedEvent event = + (GroupInTerminatedEvent) Util.jsonToObject(message, GroupInTerminatedEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupTerminatingEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group in GroupTerminatingEvent message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java new file mode 100644 index 0000000..493bd6c --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusGroupTerminatingMessageProcessor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.application.status; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +public class ApplicationStatusGroupTerminatingMessageProcessor extends MessageProcessor { + private static final Log log = + LogFactory.getLog(ApplicationStatusGroupTerminatingMessageProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (GroupTerminatingEvent.class.getName().equals(type)) { + // Parse complete message and build event + GroupTerminatingEvent event = + (GroupTerminatingEvent) Util.jsonToObject(message, GroupTerminatingEvent.class); + + if (log.isDebugEnabled()) { + log.debug("Received GroupTerminatingEvent: " + event.toString()); + } + // Notify event listeners + notifyEventListeners(event); + return true; + } else { + if (nextProcessor != null) { + return nextProcessor.process(type, message, object); + } else { + throw new RuntimeException( + String.format("Failed to process group in GroupTerminatingEvent message " + + "using available message processors: [type] %s [body] %s", type, message)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java index 3c32de8..d5dccbb 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/ApplicationStatusMessageProcessorChain.java @@ -43,6 +43,9 @@ public class ApplicationStatusMessageProcessorChain extends MessageProcessorChai private ApplicationStatusAppTerminatedMessageProcessor applicationStatusAppTerminatedMessageProcessor; private ApplicationStatusAppTerminatingMessageProcessor applicationStatusAppTerminatingMessageProcessor; + private ApplicationStatusGroupTerminatedMessageProcessor groupTerminatedMessageProcessor; + private ApplicationStatusGroupTerminatingMessageProcessor groupTerminatingMessageProcessor; + public void initialize() { // Add instance notifier event processors clusterActivatedMessageProcessor = new ApplicationStatusClusterActivatedMessageProcessor(); @@ -72,6 +75,11 @@ public class ApplicationStatusMessageProcessorChain extends MessageProcessorChai applicationStatusAppTerminatingMessageProcessor = new ApplicationStatusAppTerminatingMessageProcessor(); this.add(applicationStatusAppTerminatingMessageProcessor); + groupTerminatedMessageProcessor = new ApplicationStatusGroupTerminatedMessageProcessor(); + this.add(groupTerminatedMessageProcessor); + + groupTerminatingMessageProcessor = new ApplicationStatusGroupTerminatingMessageProcessor(); + this.add(groupTerminatingMessageProcessor); if (log.isDebugEnabled()) { log.debug("Instance notifier message processor chain initialized"); @@ -97,7 +105,12 @@ public class ApplicationStatusMessageProcessorChain extends MessageProcessorChai applicationStatusAppTerminatingMessageProcessor.addEventListener(eventListener); } else if(eventListener instanceof ApplicationTerminatedEventListener){ applicationStatusAppTerminatedMessageProcessor.addEventListener(eventListener); - } else { + } else if (eventListener instanceof GroupInTerminatingEventListener){ + groupTerminatingMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupInTerminatedEventListener){ + groupTerminatedMessageProcessor.addEventListener(eventListener); + } else + { throw new RuntimeException("Unknown event listener"); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java new file mode 100644 index 0000000..6196776 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatedProcessor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.Group; +import org.apache.stratos.messaging.domain.topology.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupTerminatedProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupTerminatedProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupTerminatedEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupTerminatedEvent event = (GroupTerminatedEvent) Util. + jsonToObject(message, GroupTerminatedEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupTerminatedEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Terminating)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); + } + group.setStatus(GroupStatus.Terminated); + if (log.isInfoEnabled()) { + log.info(String.format("Group updated as activated : %s", + group.getUniqueIdentifier())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java new file mode 100644 index 0000000..afec0f2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupTerminatingProcessor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.messaging.message.processor.topology; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.Group; +import org.apache.stratos.messaging.domain.topology.GroupStatus; +import org.apache.stratos.messaging.domain.topology.Topology; +import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater; +import org.apache.stratos.messaging.util.Util; + +/** + * This processor will act upon the Group activation events + */ +public class GroupTerminatingProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(GroupTerminatingProcessor.class); + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + Topology topology = (Topology) object; + + if (GroupTerminatingEvent.class.getName().equals(type)) { + // Return if topology has not been initialized + if (!topology.isInitialized()) + return false; + + // Parse complete message and build event + GroupTerminatingEvent event = (GroupTerminatingEvent) Util. + jsonToObject(message, GroupTerminatingEvent.class); + + TopologyUpdater.acquireWriteLockForApplication(event.getAppId()); + + try { + return doProcess(event, topology); + + } finally { + TopologyUpdater.releaseWriteLockForApplication(event.getAppId()); + } + + } else { + if (nextProcessor != null) { + // ask the next processor to take care of the message. + return nextProcessor.process(type, message, topology); + } else { + throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + private boolean doProcess (GroupTerminatingEvent event,Topology topology) { + + // Validate event against the existing topology + Application application = topology.getApplication(event.getAppId()); + if (application == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Application does not exist: [service] %s", + event.getAppId())); + } + return false; + } + Group group = application.getGroupRecursively(event.getGroupId()); + + if (group == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(), + event.getGroupId())); + } + } else { + // Apply changes to the topology + if (!group.isStateTransitionValid(GroupStatus.Active)) { + log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active); + } + group.setStatus(GroupStatus.Terminating); + if (log.isInfoEnabled()) { + log.info(String.format("Group updated as Terminating : %s", + group.getUniqueIdentifier())); + } + } + + // Notify event listeners + notifyEventListeners(event); + return true; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/773a01ea/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java index 77e4b01..1fb3961 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java @@ -22,14 +22,10 @@ package org.apache.stratos.messaging.message.processor.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.messaging.listener.EventListener; -import org.apache.stratos.messaging.listener.application.status.*; +import org.apache.stratos.messaging.listener.application.status.ApplicationInActivatedEventListener; +import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatedEventListener; +import org.apache.stratos.messaging.listener.application.status.ApplicationTerminatingEventListener; import org.apache.stratos.messaging.listener.topology.*; -import org.apache.stratos.messaging.listener.topology.ApplicationActivatedEventListener; -import org.apache.stratos.messaging.listener.topology.ApplicationCreatedEventListener; -import org.apache.stratos.messaging.listener.topology.ClusterActivatedEventListener; -import org.apache.stratos.messaging.listener.topology.ClusterInActivateEventListener; -import org.apache.stratos.messaging.listener.topology.GroupActivatedEventListener; -import org.apache.stratos.messaging.listener.topology.GroupInActivateEventListener; import org.apache.stratos.messaging.message.processor.MessageProcessorChain; /** @@ -62,6 +58,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { private ApplicationInactivatedMessageProcessor applicationInactivatedMessageProcessor; private ApplicationTerminatedMessageProcessor applicationTerminatedMessageProcessor; private ApplicationTerminatingMessageProcessor applicationTerminatingMessageProcessor; + private GroupTerminatingProcessor groupTerminatingProcessor; + private GroupTerminatedProcessor groupTerminatedProcessor; public void initialize() { // Add topology event processors @@ -113,6 +111,12 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { groupInActivateProcessor = new GroupInActivateProcessor(); add(groupInActivateProcessor); + groupTerminatingProcessor = new GroupTerminatingProcessor(); + add(groupTerminatingProcessor); + + groupTerminatedProcessor = new GroupTerminatedProcessor(); + add(groupTerminatedProcessor); + applicationCreatedMessageProcessor = new ApplicationCreatedMessageProcessor(); add(applicationCreatedMessageProcessor); @@ -172,6 +176,10 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain { groupActivatedProcessor.addEventListener(eventListener); } else if (eventListener instanceof GroupInActivateEventListener) { groupInActivateProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupTerminatedEventListener){ + groupTerminatedProcessor.addEventListener(eventListener); + } else if (eventListener instanceof GroupTerminatingEventListener){ + groupTerminatingProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationCreatedEventListener) { applicationCreatedMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof ApplicationUndeployedEventListener) {
