Updated Branches: refs/heads/master 90a1af7a5 -> 83cbec461
Implemented tenant message processors Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/4fcdc4ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/4fcdc4ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/4fcdc4ff Branch: refs/heads/master Commit: 4fcdc4ff51b905b9e095081a1b65396c381aee49 Parents: 9ced655 Author: Imesh Gunaratne <[email protected]> Authored: Fri Dec 6 10:21:34 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Dec 6 10:21:34 2013 +0530 ---------------------------------------------------------------------- .../stratos/messaging/domain/tenant/Tenant.java | 4 ++ .../event/tenant/TenantCreatedEvent.java | 18 +++----- .../tenant/TenantCreatedMessageProcessor.java | 36 +++++++++++++++- .../tenant/TenantRemovedMessageProcessor.java | 45 +++++++++++++++++++- .../tenant/TenantUpdatedMessageProcessor.java | 45 +++++++++++++++++++- 5 files changed, 131 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/4fcdc4ff/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java index 932cf72..039ff8c 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java @@ -42,4 +42,8 @@ public class Tenant implements Serializable{ public String getTenantDomain() { return tenantDomain; } + + public void setTenantDomain(String tenantDomain) { + this.tenantDomain = tenantDomain; + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/4fcdc4ff/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantCreatedEvent.java index 34d3b69..2c37ef3 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantCreatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/tenant/TenantCreatedEvent.java @@ -19,6 +19,8 @@ package org.apache.stratos.messaging.event.tenant; +import org.apache.stratos.messaging.domain.tenant.Tenant; + import java.io.Serializable; /** @@ -27,19 +29,13 @@ import java.io.Serializable; public class TenantCreatedEvent extends TenantEvent implements Serializable { private static final long serialVersionUID = -5954900215964894383L; - private int tenantId; - private String tenantDomain; - - public TenantCreatedEvent(int tenantId, String tenantDomain) { - this.tenantId = tenantId; - this.tenantDomain = tenantDomain; - } + private Tenant tenant; - public int getTenantId() { - return tenantId; + public TenantCreatedEvent(Tenant tenant) { + this.tenant = tenant; } - public String getTenantDomain() { - return tenantDomain; + public Tenant getTenant() { + return tenant; } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/4fcdc4ff/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java index e41982c..ebd8b5d 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantCreatedMessageProcessor.java @@ -19,7 +19,12 @@ package org.apache.stratos.messaging.message.processor.tenant; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.tenant.TenantCreatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.tenant.TenantManager; +import org.apache.stratos.messaging.util.Util; /** * Tenant created message processor for triggering tenant created event @@ -27,13 +32,40 @@ import org.apache.stratos.messaging.message.processor.MessageProcessor; */ public class TenantCreatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(TenantCreatedMessageProcessor.class); + + private MessageProcessor nextProcessor; + @Override public void setNext(MessageProcessor nextProcessor) { - + this.nextProcessor = nextProcessor; } @Override public boolean process(String type, String message, Object object) { - return false; + if (TenantCreatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + TenantCreatedEvent event = (TenantCreatedEvent) Util.jsonToObject(message, TenantCreatedEvent.class); + + try { + TenantManager.acquireWriteLock(); + TenantManager.getInstance().addTenant(event.getTenant()); + if(log.isInfoEnabled()) { + log.info(String.format("Tenant created: [tenant-id] %d [tenant-domain] %s", event.getTenant().getTenantId(), event.getTenant().getTenantDomain())); + } + return true; + } + finally { + TenantManager.releaseWriteLock(); + } + } + else { + if(nextProcessor != null) { + return nextProcessor.process(type, message, object); + } + else { + throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message)); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/4fcdc4ff/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java index 27bb681..2ad5cb8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantRemovedMessageProcessor.java @@ -19,7 +19,13 @@ package org.apache.stratos.messaging.message.processor.tenant; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.tenant.Tenant; +import org.apache.stratos.messaging.event.tenant.TenantRemovedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.tenant.TenantManager; +import org.apache.stratos.messaging.util.Util; /** * Tenant removed message processor for triggering tenant removed event @@ -27,13 +33,48 @@ import org.apache.stratos.messaging.message.processor.MessageProcessor; */ public class TenantRemovedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(TenantRemovedMessageProcessor.class); + + private MessageProcessor nextProcessor; + @Override public void setNext(MessageProcessor nextProcessor) { - + this.nextProcessor = nextProcessor; } @Override public boolean process(String type, String message, Object object) { - return false; + if (TenantRemovedEvent.class.getName().equals(type)) { + // Parse complete message and build event + TenantRemovedEvent event = (TenantRemovedEvent) Util.jsonToObject(message, TenantRemovedEvent.class); + + try { + TenantManager.acquireWriteLock(); + Tenant tenant = TenantManager.getInstance().getTenant(event.getTenantId()); + if(tenant == null) { + if(log.isWarnEnabled()) { + log.warn(String.format("Tenant not found: [tenant-id] %d", event.getTenantId())); + } + return false; + } + TenantManager.getInstance().removeTenant(event.getTenantId()); + + if(log.isInfoEnabled()) { + log.info(String.format("Tenant removed: [tenant-id] %d [tenant-domain] %s", tenant.getTenantId(), tenant.getTenantDomain())); + } + return true; + } + finally { + TenantManager.releaseWriteLock(); + } + } + else { + if(nextProcessor != null) { + return nextProcessor.process(type, message, object); + } + else { + throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message)); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/4fcdc4ff/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java index eae5cdd..e89ef36 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/tenant/TenantUpdatedMessageProcessor.java @@ -19,7 +19,13 @@ package org.apache.stratos.messaging.message.processor.tenant; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.tenant.Tenant; +import org.apache.stratos.messaging.event.tenant.TenantUpdatedEvent; import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.message.receiver.tenant.TenantManager; +import org.apache.stratos.messaging.util.Util; /** * Tenant updated message processor for triggering tenant updated event @@ -27,13 +33,48 @@ import org.apache.stratos.messaging.message.processor.MessageProcessor; */ public class TenantUpdatedMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(TenantUpdatedMessageProcessor.class); + + private MessageProcessor nextProcessor; + @Override public void setNext(MessageProcessor nextProcessor) { - + this.nextProcessor = nextProcessor; } @Override public boolean process(String type, String message, Object object) { - return false; + if (TenantUpdatedEvent.class.getName().equals(type)) { + // Parse complete message and build event + TenantUpdatedEvent event = (TenantUpdatedEvent) Util.jsonToObject(message, TenantUpdatedEvent.class); + + try { + TenantManager.acquireWriteLock(); + Tenant tenant = TenantManager.getInstance().getTenant(event.getTenantId()); + if(tenant == null) { + if(log.isWarnEnabled()) { + log.warn(String.format("Tenant not found: [tenant-id] %d", event.getTenantId())); + } + return false; + } + tenant.setTenantDomain(event.getTenantDomain()); + + if(log.isInfoEnabled()) { + log.info(String.format("Tenant updated: [tenant-id] %d [tenant-domain] %s", tenant.getTenantId(), tenant.getTenantDomain())); + } + return true; + } + finally { + TenantManager.releaseWriteLock(); + } + } + else { + if(nextProcessor != null) { + return nextProcessor.process(type, message, object); + } + else { + throw new RuntimeException(String.format("Failed to process tenant message using available message processors: [type] %s [body] %s", type, message)); + } + } } }
