Author: bdekruijff at gmail.com
Date: Tue Jan 11 10:49:48 2011
New Revision: 584
Log:
[sandbox] mcast disabling. support / static member support / member events /
more routing / robustness and fixes
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMember.java
- copied, changed from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMemberAddedEvent.java
- copied, changed from r582,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMemberRemovedEvent.java
- copied, changed from r582,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterChannelService.java
- copied, changed from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ChannelMemberImpl.java
- copied, changed from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterUtilities.java
- copied, changed from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterChannelCreator.java
- copied, changed from r582,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
Removed:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMember.java
(from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMember.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMember.java
Tue Jan 11 10:49:48 2011
@@ -16,7 +16,7 @@
*/
package org.amdatu.core.fabric.cluster;
-public interface ClusterMember {
+public interface ChannelMember {
- String getMemberId();
+ String getName();
}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMemberAddedEvent.java
(from r582,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMemberAddedEvent.java
Tue Jan 11 10:49:48 2011
@@ -1,21 +1,21 @@
package org.amdatu.core.fabric.cluster;
-public class MemberAddedEvent {
+public class ChannelMemberAddedEvent {
- private final String m_memberId;
+ private final String m_memberName;
- public MemberAddedEvent(final String memberId) {
- m_memberId = memberId;
+ public ChannelMemberAddedEvent(final String memberName) {
+ m_memberName = memberName;
}
- public String getMemberId() {
- return m_memberId;
+ public String getMemberName() {
+ return m_memberName;
}
@Override
public String toString() {
- StringBuilder sb = new
StringBuilder(MemberAddedEvent.class.getSimpleName() + "{");
- sb.append("\n\tmemberId: " + m_memberId);
+ StringBuilder sb = new
StringBuilder(ChannelMemberAddedEvent.class.getSimpleName() + "{");
+ sb.append("\n\tmemberName: " + m_memberName);
sb.append("\n}");
return sb.toString();
}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMemberRemovedEvent.java
(from r582,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ChannelMemberRemovedEvent.java
Tue Jan 11 10:49:48 2011
@@ -1,21 +1,21 @@
package org.amdatu.core.fabric.cluster;
-public class MemberRemovedEvent {
+public class ChannelMemberRemovedEvent {
- private final String m_memberId;
+ private final String m_memberName;
- public MemberRemovedEvent(final String memberId) {
- m_memberId = memberId;
+ public ChannelMemberRemovedEvent(final String memberName) {
+ m_memberName = memberName;
}
- public String getMemberId() {
- return m_memberId;
+ public String getMemberName() {
+ return m_memberName;
}
@Override
public String toString() {
- StringBuilder sb = new
StringBuilder(MemberRemovedEvent.class.getSimpleName() + "{");
- sb.append("\n\tmemberId: " + m_memberId);
+ StringBuilder sb = new
StringBuilder(ChannelMemberRemovedEvent.class.getSimpleName() + "{");
+ sb.append("\n\tmemberName: " + m_memberName);
sb.append("\n}");
return sb.toString();
}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterChannelService.java
(from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterMemberService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/ClusterChannelService.java
Tue Jan 11 10:49:48 2011
@@ -51,7 +51,7 @@
* </pre>
*
*/
-public interface ClusterMemberService {
+public interface ClusterChannelService {
/**
* OSGi service registration property name that is used to publish the
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
Tue Jan 11 10:49:48 2011
@@ -31,82 +31,82 @@
private static final long serialVersionUID = 1L;
- private String m_originClusterId;
- private String m_originMemberId;
- private String m_originServiceGroup;
-
- private String m_targetClusterId;
- private String m_targetMemberId;
- private String m_targetServiceGroup;
+ private String m_fromClusterChannel;
+ private String m_fromChannelMember;
+ private String m_fromServiceGroup;
+
+ private String m_toClusterChannel;
+ private String m_toChannelMember;
+ private String m_toServiceGroup;
public RoutableMessage(final String serviceGroup) {
- m_originServiceGroup = serviceGroup;
- m_targetServiceGroup = serviceGroup;
+ m_fromServiceGroup = serviceGroup;
+ m_toServiceGroup = serviceGroup;
}
- public RoutableMessage(final String clusterId, final String memberId,
final String serviceGroup) {
- m_targetClusterId = clusterId;
- m_targetServiceGroup = serviceGroup;
- m_targetMemberId = memberId;
+ public RoutableMessage(String clusterChannel, String channelMember, String
serviceGroup) {
+ m_toClusterChannel = clusterChannel;
+ m_toServiceGroup = serviceGroup;
+ m_toChannelMember = channelMember;
}
- public final String getOriginClusterId() {
- return m_originClusterId;
+ public final String getFromClusterChannel() {
+ return m_fromClusterChannel;
}
- public final void setOriginClusterId(String clusterId) {
- m_originClusterId = clusterId;
+ public final void setFromClusterChannel(String clusterChannel) {
+ m_fromClusterChannel = clusterChannel;
}
- public final String getOriginMemberId() {
- return m_originMemberId;
+ public final String getFromChannelMember() {
+ return m_fromChannelMember;
}
- public final void setOriginMemberId(String memberId) {
- m_originMemberId = memberId;
+ public final void setFromChannelMember(String channelMember) {
+ m_fromChannelMember = channelMember;
}
- public final String getOriginServiceGroup() {
- return m_originServiceGroup;
+ public final String getFromServiceGroup() {
+ return m_fromServiceGroup;
}
- public final void setOriginServcieGroup(String serviceGroup) {
- m_originServiceGroup = serviceGroup;
+ public final void setFromServiceGroup(String serviceGroup) {
+ m_fromServiceGroup = serviceGroup;
}
- public final String getTargetClusterId() {
- return m_targetClusterId;
+ public final String getToClusterChannel() {
+ return m_toClusterChannel;
}
- public final void setTargetClusterId(String clusterId) {
- m_targetClusterId = clusterId;
+ public final void setToClusterChannel(String clusterChannel) {
+ m_toClusterChannel = clusterChannel;
}
- public final String getTargetMemberId() {
- return m_targetMemberId;
+ public final String getToChannelMember() {
+ return m_toChannelMember;
}
- public final void setTargetMemberId(String memberId) {
- m_targetMemberId = memberId;
+ public final void setToChannelMember(String memberName) {
+ m_toChannelMember = memberName;
}
- public final String getTargetServiceGroup() {
- return m_targetServiceGroup;
+ public final String getToServiceGroup() {
+ return m_toServiceGroup;
}
- public final void setTargetServiceGroup(String serviceGroup) {
- m_targetServiceGroup = serviceGroup;
+ public final void setToServiceGroup(String serviceGroup) {
+ m_toServiceGroup = serviceGroup;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("RoutableMessage{");
- sb.append("\n\toriginClusterId=" + m_originClusterId);
- sb.append("\n\toriginMemberId=" + m_originMemberId);
- sb.append("\n\toriginServiceGroup=" + m_originServiceGroup);
- sb.append("\n\ttargetClusterId=" + m_targetClusterId);
- sb.append("\n\ttargetMemberId=" + m_targetMemberId);
- sb.append("\n\ttargetServiceGroup=" + m_targetServiceGroup);
+ sb.append("\n\tfromClusterChannel=" + m_fromClusterChannel);
+ sb.append("\n\tfromChannelMember=" + m_fromChannelMember);
+ sb.append("\n\tfromServiceGroup=" + m_fromServiceGroup);
+ sb.append("\n\ttoClusterChannel=" + m_toClusterChannel);
+ sb.append("\n\ttoChannelMember=" + m_toChannelMember);
+ sb.append("\n\ttoServiceGroup=" + m_toServiceGroup);
sb.append("\n}");
return sb.toString();
}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ChannelMemberImpl.java
(from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ChannelMemberImpl.java
Tue Jan 11 10:49:48 2011
@@ -16,26 +16,26 @@
*/
package org.amdatu.core.fabric.cluster.internal;
-import org.amdatu.core.fabric.cluster.ClusterMember;
+import org.amdatu.core.fabric.cluster.ChannelMember;
-public class ClusterMemberImpl implements ClusterMember {
+public class ChannelMemberImpl implements ChannelMember {
- public final String m_memberId;
+ public final String m_name;
/********************************************************
* Constructors
********************************************************/
- public ClusterMemberImpl(final String memberId) {
- m_memberId = memberId;
+ public ChannelMemberImpl(final String memberName) {
+ m_name = memberName;
}
/********************************************************
* ClusterMember
********************************************************/
- public String getMemberId() {
- return m_memberId;
+ public String getName() {
+ return m_name;
}
/********************************************************
@@ -44,11 +44,11 @@
@Override
public boolean equals(Object obj) {
- return m_memberId.equals(((ClusterMemberImpl) obj).getMemberId());
+ return m_name.equals(((ChannelMemberImpl) obj).getName());
}
@Override
public int hashCode() {
- return m_memberId.hashCode();
+ return m_name.hashCode();
}
}
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterUtilities.java
(from r581,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterMemberUtilities.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/ClusterUtilities.java
Tue Jan 11 10:49:48 2011
@@ -19,20 +19,20 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.ClusterChannelService;
-public final class ClusterMemberUtilities {
+public final class ClusterUtilities {
private final static Pattern m_validNamePattern =
Pattern.compile("[a-zA-Z0-9-_]+");
public static String getClusterChannelSendTopic(final String
clusterChannel) {
- return ClusterMemberService.EVENT_SEND_TOPIC_TEMPLATE.replace(
-
ClusterMemberService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER,
clusterChannel);
+ return ClusterChannelService.EVENT_SEND_TOPIC_TEMPLATE.replace(
+
ClusterChannelService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER,
clusterChannel);
}
public static String getClusterChannelReceiveTopic(final String
clusterChannel) {
- return ClusterMemberService.EVENT_RECIEVE_TOPIC_TEMPLATE.replace(
-
ClusterMemberService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER,
clusterChannel);
+ return ClusterChannelService.EVENT_RECIEVE_TOPIC_TEMPLATE.replace(
+
ClusterChannelService.EVENT_TOPIC_TEMPLATE_CLUSTERCHANNEL_PLACEHOLDER,
clusterChannel);
}
public static boolean isValidChannelName(final String channelName) {
Copied:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterChannelCreator.java
(from r582,
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java)
==============================================================================
---
/sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ClusterChannelCreator.java
Tue Jan 11 10:49:48 2011
@@ -36,7 +36,7 @@
* @see
http://svn.apache.org/repos/asf/tomcat/tc7.0.x/tags/TOMCAT_7_0_5/test/org/apache/catalina/tribes/demos/ChannelCreator.java
*
*/
-public class ChannelBuilder {
+public class ClusterChannelCreator {
/**
* Type of membership discovery. Default is "mcast" for dynamic discovery
and
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
Tue Jan 11 10:49:48 2011
@@ -24,14 +24,13 @@
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.amdatu.core.fabric.cluster.ClusterMember;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.ChannelMember;
+import org.amdatu.core.fabric.cluster.ChannelMemberAddedEvent;
+import org.amdatu.core.fabric.cluster.ChannelMemberRemovedEvent;
+import org.amdatu.core.fabric.cluster.ClusterChannelService;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
-import org.amdatu.core.fabric.cluster.MemberAddedEvent;
-import org.amdatu.core.fabric.cluster.MemberRemovedEvent;
import org.amdatu.core.fabric.cluster.RoutableMessage;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
-import org.amdatu.core.fabric.remote.internal.EndpointInvokeMessage;
+import org.amdatu.core.fabric.cluster.internal.ClusterUtilities;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
import org.apache.felix.dm.ServiceDependency;
@@ -46,9 +45,9 @@
*
* TODO add a static(?) getConfigurationProperties method
*/
-public abstract class ClusterMemberServiceBase implements ClusterMemberService
{
+public abstract class ClusterMemberServiceBase implements
ClusterChannelService {
- private final Map<String, ClusterMember> m_clusterMembers = new
HashMap<String, ClusterMember>();
+ private final Map<String, ChannelMember> m_clusterMembers = new
HashMap<String, ChannelMember>();
private final ReentrantReadWriteLock m_clusterMembersLock = new
ReentrantReadWriteLock();
private final String m_clusterId;
@@ -80,8 +79,8 @@
m_properties.put(key, properties.get(key));
}
}
- m_recieveEventTopic =
ClusterMemberUtilities.getClusterChannelReceiveTopic(m_clusterId);
- m_sendEventTopic =
ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterId);
+ m_recieveEventTopic =
ClusterUtilities.getClusterChannelReceiveTopic(m_clusterId);
+ m_sendEventTopic =
ClusterUtilities.getClusterChannelSendTopic(m_clusterId);
}
/********************************************************
@@ -138,17 +137,17 @@
}
- protected final ClusterMember[] getClusterMembers() {
+ protected final ChannelMember[] getClusterMembers() {
m_clusterMembersLock.readLock().lock();
try {
- return m_clusterMembers.values().toArray(new
ClusterMember[m_clusterMembers.size()]);
+ return m_clusterMembers.values().toArray(new
ChannelMember[m_clusterMembers.size()]);
}
finally {
m_clusterMembersLock.readLock().unlock();
}
}
- protected final ClusterMember getClusterMember(final String memberId) {
+ protected final ChannelMember getClusterMember(final String memberId) {
m_clusterMembersLock.readLock().lock();
try {
return m_clusterMembers.get(memberId);
@@ -158,16 +157,16 @@
}
}
- protected final void addClusterMember(ClusterMember clusterMember) {
+ protected final void addClusterMember(ChannelMember clusterMember) {
m_clusterMembersLock.writeLock().lock();
try {
- m_clusterMembers.put(clusterMember.getMemberId(), clusterMember);
+ m_clusterMembers.put(clusterMember.getName(), clusterMember);
}
finally {
m_clusterMembersLock.writeLock().unlock();
}
Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(EVENT_MESSAGE_PROPERTY, new
MemberAddedEvent(clusterMember.getMemberId()));
+ props.put(EVENT_MESSAGE_PROPERTY, new
ChannelMemberAddedEvent(clusterMember.getName()));
Event broadCastEvent = new Event(m_recieveEventTopic, props);
m_eventAdmin.postEvent(broadCastEvent);
}
@@ -181,12 +180,12 @@
m_clusterMembersLock.writeLock().unlock();
}
Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(EVENT_MESSAGE_PROPERTY, new MemberRemovedEvent(memberId));
+ props.put(EVENT_MESSAGE_PROPERTY, new
ChannelMemberRemovedEvent(memberId));
Event broadCastEvent = new Event(m_recieveEventTopic, props);
m_eventAdmin.postEvent(broadCastEvent);
}
- protected final void dispatchMessage(Object message, ClusterMember sender)
{
+ protected final void dispatchMessage(Object message, ChannelMember sender)
{
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(EVENT_MESSAGE_PROPERTY, message);
if (message instanceof LocalTopicMessage) {
@@ -213,7 +212,7 @@
protected abstract void doBroadcast(Object message);
- protected abstract void doSend(ClusterMember[] clusterMember, Object
message);
+ protected abstract void doSend(ChannelMember[] clusterMember, Object
message);
/********************************************************
* Private methods
@@ -224,9 +223,9 @@
Dictionary<String, Object> serviceProps =
m_component.getServiceProperties();
if (serviceProps == null)
serviceProps = new Hashtable<String, Object>();
- serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterId);
- serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
m_memberId);
- serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY,
m_properties);
+
serviceProps.put(ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterId);
+ serviceProps.put(ClusterChannelService.SERVICE_CLUSTERMEMBER_PROPERTY,
m_memberId);
+ serviceProps.put(ClusterChannelService.SERVICE_CONFIGURATION_PROPERTY,
m_properties);
m_component.setServiceProperties(serviceProps);
}
@@ -267,17 +266,17 @@
return;
}
RoutableMessage routableMessage = (RoutableMessage) message;
- routableMessage.setOriginClusterId(m_clusterId);
- routableMessage.setOriginMemberId(m_memberId);
- routableMessage.setTargetClusterId(m_clusterId);
- if (routableMessage.getTargetMemberId() != null) {
- ClusterMember clusterMember =
getClusterMember(routableMessage.getTargetMemberId());
+ routableMessage.setFromClusterChannel(m_clusterId);
+ routableMessage.setFromChannelMember(m_memberId);
+ routableMessage.setToClusterChannel(m_clusterId);
+ if (routableMessage.getToChannelMember() != null) {
+ ChannelMember clusterMember =
getClusterMember(routableMessage.getToChannelMember());
if (clusterMember == null) {
m_logService.log(LogService.LOG_ERROR, "RoutedMessage
specifies unknown target member: "
+ routableMessage.toString());
return;
}
- doSend(new ClusterMember[] { clusterMember }, message);
+ doSend(new ChannelMember[] { clusterMember }, message);
}
else {
doBroadcast(message);
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
Tue Jan 11 10:49:48 2011
@@ -27,9 +27,9 @@
import java.util.Properties;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.amdatu.core.fabric.cluster.ClusterMember;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
-import org.amdatu.core.fabric.cluster.internal.tribes.ChannelBuilder;
+import org.amdatu.core.fabric.cluster.ChannelMember;
+import org.amdatu.core.fabric.cluster.internal.ChannelMemberImpl;
+import org.amdatu.core.fabric.cluster.internal.tribes.ClusterChannelCreator;
import org.amdatu.core.fabric.cluster.service.ClusterMemberServiceBase;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
@@ -73,7 +73,7 @@
getLogService().log(LogService.LOG_DEBUG, "Starting managed channel");
try {
m_managedChannel =
- (ManagedChannel) ChannelBuilder
+ (ManagedChannel) ClusterChannelCreator
.createChannel(getClusterId(), getProperties());
Properties props = new Properties();
@@ -126,7 +126,7 @@
}
@Override
- public void doSend(ClusterMember[] clusterMembers, Object message) {
+ public void doSend(ChannelMember[] clusterMembers, Object message) {
if (!(message instanceof Serializable)) {
getLogService().log(LogService.LOG_ERROR,
"Dropping message during broadcast because is is not
Serializable: "
@@ -136,8 +136,8 @@
List<Member> memberList = new LinkedList<Member>();
m_membersLock.readLock().lock();
try {
- for (ClusterMember clusterMember : clusterMembers) {
- Member member = m_members.get(clusterMember.getMemberId());
+ for (ChannelMember clusterMember : clusterMembers) {
+ Member member = m_members.get(clusterMember.getName());
if (member != null) {
memberList.add(member);
}
@@ -177,7 +177,7 @@
finally {
m_membersLock.writeLock().unlock();
}
- super.addClusterMember(new ClusterMemberImpl(memberId));
+ super.addClusterMember(new ChannelMemberImpl(memberId));
}
private void memberDisappeared(Member member) {
@@ -231,7 +231,7 @@
public void messageReceived(Serializable message, Member member) {
String memberId = getMemberId(member);
- ClusterMember clusterMember = getClusterMember(memberId);
+ ChannelMember clusterMember = getClusterMember(memberId);
if (clusterMember == null) {
getLogService().log(LogService.LOG_ERROR,
"Dropping message for active member: " +
message.toString());
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
Tue Jan 11 10:49:48 2011
@@ -36,8 +36,8 @@
}
public ServiceEndPoint getServiceEndPoint() {
- m_serviceEndPoint.setClusterId(getOriginClusterId());
- m_serviceEndPoint.setMemberId(getOriginMemberId());
+ m_serviceEndPoint.setClusterId(getFromClusterChannel());
+ m_serviceEndPoint.setMemberId(getFromChannelMember());
return m_serviceEndPoint;
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
Tue Jan 11 10:49:48 2011
@@ -47,7 +47,7 @@
Matcher m1 = p.matcher(super.toString());
sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
sb.append("\n\tclusterId=" + m_clusterId);
- sb.append("\n\tmemberId=" + getOriginMemberId());
+ sb.append("\n\tmemberId=" + getFromChannelMember());
sb.append("\n\tserviceGroup=" + m_serviceGroup);
sb.append("\n}");
return sb.toString();
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
Tue Jan 11 10:49:48 2011
@@ -47,8 +47,8 @@
}
public String getLocalTopic() {
- return DistributionUtilities.getLocalInvokeTopic(getTargetClusterId(),
- getTargetServiceGroup());
+ return DistributionUtilities.getLocalInvokeTopic(getToClusterChannel(),
+ getToServiceGroup());
}
@Override
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
Tue Jan 11 10:49:48 2011
@@ -36,8 +36,8 @@
}
public ServiceEndPoint getServiceEndPoint() {
- m_serviceEndPoint.setClusterId(getOriginClusterId());
- m_serviceEndPoint.setMemberId(getOriginMemberId());
+ m_serviceEndPoint.setClusterId(getFromClusterChannel());
+ m_serviceEndPoint.setMemberId(getFromChannelMember());
return m_serviceEndPoint;
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
Tue Jan 11 10:49:48 2011
@@ -42,8 +42,8 @@
}
public String getLocalTopic() {
- return
DistributionUtilities.getLocalResponseTopic(getTargetClusterId(),
- getTargetServiceGroup());
+ return
DistributionUtilities.getLocalResponseTopic(getToClusterChannel(),
+ getToServiceGroup());
}
@Override
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
Tue Jan 11 10:49:48 2011
@@ -28,8 +28,8 @@
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.cluster.ClusterChannelService;
+import org.amdatu.core.fabric.cluster.internal.ClusterUtilities;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
import org.amdatu.core.fabric.remote.service.DistributionServiceImpl;
import org.apache.felix.dm.Component;
@@ -85,7 +85,7 @@
m_serviceInterfaceMethods.add(serviceMethod);
}
}
- m_clusterChannelInvokeTopic =
ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId);
+ m_clusterChannelInvokeTopic =
ClusterUtilities.getClusterChannelSendTopic(m_clusterGroupId);
}
/********************************************************
@@ -144,10 +144,10 @@
Map<String, Object> messagePayload =
getInvocationPayload(invocationIdentifier, method, args);
EndpointInvokeMessage message = new
EndpointInvokeMessage(m_serviceEndpoint, messagePayload);
// FIXME this is awkward
- message.setOriginServcieGroup(m_serviceGroupId);
+ message.setFromServiceGroup(m_serviceGroupId);
Dictionary<String, Object> eventPayload = new Hashtable<String,
Object>();
- eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
message);
+ eventPayload.put(ClusterChannelService.EVENT_MESSAGE_PROPERTY,
message);
Event event = new Event(m_clusterChannelInvokeTopic, eventPayload);
m_eventAdmin.postEvent(event);
@@ -294,7 +294,7 @@
class ServiceInvocationEventHandler implements EventHandler {
public void handleEvent(Event event) {
- Object message =
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+ Object message =
event.getProperty(ClusterChannelService.EVENT_MESSAGE_PROPERTY);
if (message instanceof EndpointResponseMessage) {
EndpointResponseMessage endpointResponseMessage =
(EndpointResponseMessage) message;
Map<String, Object> payload =
endpointResponseMessage.getPayload();
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
Tue Jan 11 10:49:48 2011
@@ -26,10 +26,10 @@
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.MemberAddedEvent;
-import org.amdatu.core.fabric.cluster.MemberRemovedEvent;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.cluster.ClusterChannelService;
+import org.amdatu.core.fabric.cluster.ChannelMemberAddedEvent;
+import org.amdatu.core.fabric.cluster.ChannelMemberRemovedEvent;
+import org.amdatu.core.fabric.cluster.internal.ClusterUtilities;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
import org.amdatu.core.fabric.remote.RemoteServiceEndPoint;
@@ -93,7 +93,7 @@
if (discoveryProps == null) {
discoveryProps = new Hashtable<String, Object>();
}
-
discoveryProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
+
discoveryProps.put(ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
discoveryProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
m_component.setServiceProperties(discoveryProps);
@@ -106,8 +106,8 @@
ServiceDependency clusterMemberDependency =
m_dependencyManager.createServiceDependency();
clusterMemberDependency
.setService(
- ClusterMemberService.class,
- "(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"=" + m_clusterGroupId + ")")
+ ClusterChannelService.class,
+ "(" + ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"=" + m_clusterGroupId + ")")
.setRequired(true);
clusterMemberDependency.setInstanceBound(true);
m_component.add(clusterMemberDependency);
@@ -121,7 +121,7 @@
ServiceDependency remotableServiceEndpointsDependecy =
m_dependencyManager.createServiceDependency();
remotableServiceEndpointsDependecy
.setService(RemotableServiceEndpoint.class,
- "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ "(&(" +
ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setCallbacks("remotableServiceEndPointAdded",
"remotableServiceEndPointRemoved")
@@ -131,7 +131,7 @@
Dictionary<String, Object> eventHandlerProps = new Hashtable<String,
Object>();
eventHandlerProps.put(EventConstants.EVENT_TOPIC,
new String[] {
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId),
-
ClusterMemberUtilities.getClusterChannelReceiveTopic(m_clusterGroupId) });
+
ClusterUtilities.getClusterChannelReceiveTopic(m_clusterGroupId) });
m_discoveryEventHandlerComponent =
m_dependencyManager.createComponent();
m_discoveryEventHandlerComponent.setInterface(EventHandler.class.getName(),
eventHandlerProps);
m_discoveryEventHandlerComponent.setImplementation(new
DiscoveryEventHandler());
@@ -199,20 +199,20 @@
* Private methods
********************************************************/
- private Event createDiscoveryEvent(MemberAddedEvent memberAddedEvent) {
+ private Event createDiscoveryEvent(ChannelMemberAddedEvent
memberAddedEvent) {
EndpointDiscoveryMessage endpointDiscoveryMessage =
new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId);
if (memberAddedEvent != null) {
- endpointDiscoveryMessage.setTargetClusterId(m_clusterGroupId);
-
endpointDiscoveryMessage.setTargetMemberId(memberAddedEvent.getMemberId());
+ endpointDiscoveryMessage.setToClusterChannel(m_clusterGroupId);
+
endpointDiscoveryMessage.setToChannelMember(memberAddedEvent.getMemberName());
}
Dictionary<String, Object> eventProps = new Hashtable<String,
Object>();
- eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
+ eventProps.put(ClusterChannelService.EVENT_MESSAGE_PROPERTY,
new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
Event discoveryEvent =
- new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
+ new
Event(ClusterUtilities.getClusterChannelSendTopic(m_clusterGroupId),
eventProps);
return discoveryEvent;
}
@@ -223,22 +223,22 @@
EndpointPublishMessage endpointPublishMessage = new
EndpointPublishMessage(serviceEndPoint);
if (endpointDiscoveryMessage != null) {
-
endpointPublishMessage.setTargetClusterId(endpointDiscoveryMessage.getOriginClusterId());
-
endpointPublishMessage.setTargetMemberId(endpointDiscoveryMessage.getOriginMemberId());
+
endpointPublishMessage.setToClusterChannel(endpointDiscoveryMessage.getFromClusterChannel());
+
endpointPublishMessage.setToChannelMember(endpointDiscoveryMessage.getFromChannelMember());
}
- props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
endpointPublishMessage);
+ props.put(ClusterChannelService.EVENT_MESSAGE_PROPERTY,
endpointPublishMessage);
Event event =
- new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
+ new
Event(ClusterUtilities.getClusterChannelSendTopic(m_clusterGroupId),
props);
return event;
}
private Event createEndpointDepublishEvent(ServiceEndPoint
serviceEndPoint) {
Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointDepublishMessage(serviceEndPoint));
+ props.put(ClusterChannelService.EVENT_MESSAGE_PROPERTY, new
EndpointDepublishMessage(serviceEndPoint));
Event event =
- new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
+ new
Event(ClusterUtilities.getClusterChannelSendTopic(m_clusterGroupId),
props);
return event;
}
@@ -296,11 +296,11 @@
return;
}
Dictionary<String, Object> distributionProps = new
Hashtable<String, Object>();
-
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
+
distributionProps.put(ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
-
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
+
distributionProps.put(ClusterChannelService.SERVICE_CLUSTERMEMBER_PROPERTY,
serviceEndPoint.getMemberId());
serviceComponent =
m_dependencyManager.createComponent()
@@ -308,7 +308,7 @@
.setImplementation(new
RemoteServiceEndPointImpl(serviceEndPoint));
serviceComponent.add(m_dependencyManager.createServiceDependency().setService(
DiscoveryService.class,
- "(&(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"="
+ "(&(" + ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY
+ "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setRequired(true));
@@ -323,19 +323,19 @@
return;
}
- private void recieveMemberAddedEvent(final MemberAddedEvent
memberAddedEvent) {
+ private void recieveMemberAddedEvent(final ChannelMemberAddedEvent
memberAddedEvent) {
m_logService.log(LogService.LOG_ERROR, "Recieved \n" +
memberAddedEvent.toString());
m_evenAdmin.postEvent(createDiscoveryEvent(memberAddedEvent));
}
- private void recieveMemberRemovedEvent(final MemberRemovedEvent
memberRemovedEvent) {
+ private void recieveMemberRemovedEvent(final ChannelMemberRemovedEvent
memberRemovedEvent) {
m_logService.log(LogService.LOG_ERROR, "Recieved \n" +
memberRemovedEvent.toString());
List<ServiceEndPoint> removeServiceEndpointList = null;
List<Component> removeServiceComponentList = null;
m_remoteEndPointComponentsLock.writeLock().lock();
try {
for (ServiceEndPoint serviceEndPoint :
m_remoteEndPointComponents.keySet()) {
- if
(serviceEndPoint.getMemberId().equals(memberRemovedEvent.getMemberId())) {
+ if
(serviceEndPoint.getMemberId().equals(memberRemovedEvent.getMemberName())) {
if (removeServiceEndpointList == null)
removeServiceEndpointList = new
LinkedList<ServiceEndPoint>();
removeServiceEndpointList.add(serviceEndPoint);
@@ -367,7 +367,7 @@
class DiscoveryEventHandler implements EventHandler {
public void handleEvent(Event event) {
- Object message =
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+ Object message =
event.getProperty(ClusterChannelService.EVENT_MESSAGE_PROPERTY);
if (message instanceof EndpointPublishMessage) {
recieveEndpointPublishMessage((EndpointPublishMessage)
message);
return;
@@ -380,12 +380,12 @@
recieveEndpointDiscoveryMessage((EndpointDiscoveryMessage)
message);
return;
}
- if (message instanceof MemberAddedEvent) {
- recieveMemberAddedEvent((MemberAddedEvent) message);
+ if (message instanceof ChannelMemberAddedEvent) {
+ recieveMemberAddedEvent((ChannelMemberAddedEvent) message);
return;
}
- if (message instanceof MemberRemovedEvent) {
- recieveMemberRemovedEvent((MemberRemovedEvent) message);
+ if (message instanceof ChannelMemberRemovedEvent) {
+ recieveMemberRemovedEvent((ChannelMemberRemovedEvent) message);
return;
}
throw new IllegalStateException("Unknown message type " +
message.getClass().getName() + "on channel "
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DistributionServiceImpl.java
Tue Jan 11 10:49:48 2011
@@ -25,8 +25,8 @@
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.cluster.ClusterChannelService;
+import org.amdatu.core.fabric.cluster.internal.ClusterUtilities;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.DistributionService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
@@ -98,7 +98,7 @@
if (distributionProps == null) {
distributionProps = new Hashtable<String, Object>();
}
-
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
+
distributionProps.put(ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
distributionProps.put(DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED_PROP,
DistributionService.DISTRIBUTION_CONFIGS_SUPPORTED);
@@ -115,8 +115,8 @@
ServiceDependency clusterMemberDependency =
m_dependencyManager.createServiceDependency();
clusterMemberDependency
.setService(
- ClusterMemberService.class,
- "(" + ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"=" + m_clusterGroupId + ")")
+ ClusterChannelService.class,
+ "(" + ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY +
"=" + m_clusterGroupId + ")")
.setRequired(true);
clusterMemberDependency.setInstanceBound(true);
m_component.add(clusterMemberDependency);
@@ -140,7 +140,7 @@
ServiceDependency remoteServiceEndpointsDependecy =
m_dependencyManager.createServiceDependency();
remoteServiceEndpointsDependecy
.setService(RemoteServiceEndPoint.class,
- "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ "(&(" +
ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setCallbacks("remoteServiceEndPointAdded",
"remoteServiceEndPointRemoved")
@@ -313,7 +313,7 @@
private Component createRemotableEndPointComponent(final ServiceEndPoint
serviceEndPoint) {
Dictionary<String, Object> distributionProps = new Hashtable<String,
Object>();
-
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
+
distributionProps.put(ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
Component serviceComponent =
m_dependencyManager.createComponent()
@@ -324,7 +324,7 @@
.createServiceDependency()
.setService(
DistributionService.class,
- "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ "(&(" +
ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setRequired(true);
@@ -361,7 +361,7 @@
props.remove(DistributionService.SERVICE_EXPORTED_INTERFACES_PROP);
props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_PROP);
props.remove(DistributionService.SERVICE_EXPORTED_INTENTS_EXTRA_PROP);
- props.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
+ props.put(ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterGroupId);
props.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
props.put(DistributionService.SERVICE_IMPORTED_PROP, "true");
props.put(DistributionService.SERVICE_INTENTS_PROP, importedIntents);
@@ -375,7 +375,7 @@
.createServiceDependency()
.setService(
DistributionService.class,
- "(&(" +
ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ "(&(" +
ClusterChannelService.SERVICE_CLUSTERCHANNEL_PROPERTY + "="
+ m_clusterGroupId + ")(" +
DiscoveryService.REMOTE_SERVICEGROUPID_PROP + "="
+ m_serviceGroupId + "))")
.setRequired(true);
@@ -520,20 +520,20 @@
private Event createEndpointResponseEvent(EndpointInvokeMessage
endpointInvokeMessage, Object serviceResponse) {
Map<String, Object> payload = endpointInvokeMessage.getPayload();
String invocationId = (String) payload.get(MESSAGE_INVOCATION_ID_KEY);
- String originClusterId = (String)
endpointInvokeMessage.getOriginClusterId();
- String originMemberId = (String)
endpointInvokeMessage.getOriginMemberId();
- String originServiceGroup = (String)
endpointInvokeMessage.getOriginServiceGroup();
+ String originClusterId = (String)
endpointInvokeMessage.getFromClusterChannel();
+ String originMemberId = (String)
endpointInvokeMessage.getFromChannelMember();
+ String originServiceGroup = (String)
endpointInvokeMessage.getFromServiceGroup();
Map<String, Object> responsePayload = new HashMap<String, Object>();
responsePayload.put(MESSAGE_INVOCATION_ID_KEY, invocationId);
responsePayload.put(MESSAGE_INVOCATION_RESPONSE_MAP_KEY,
serviceResponse);
Dictionary<String, Object> eventPayload = new Hashtable<String,
Object>();
- eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointResponseMessage(originClusterId,
+ eventPayload.put(ClusterChannelService.EVENT_MESSAGE_PROPERTY, new
EndpointResponseMessage(originClusterId,
originMemberId, originServiceGroup, responsePayload));
Event responseEvent =
- new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
+ new
Event(ClusterUtilities.getClusterChannelSendTopic(m_clusterGroupId),
eventPayload);
return responseEvent;
}
@@ -545,7 +545,7 @@
class DistributionEventHandler implements EventHandler {
public void handleEvent(Event event) {
- Object message =
event.getProperty(ClusterMemberService.EVENT_MESSAGE_PROPERTY);
+ Object message =
event.getProperty(ClusterChannelService.EVENT_MESSAGE_PROPERTY);
if (message instanceof EndpointInvokeMessage) {
recieveEndpointInvokeMessage((EndpointInvokeMessage) message);
return;
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Tue Jan 11 10:49:48 2011
@@ -23,8 +23,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.FabricManagerService;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.cluster.ClusterChannelService;
+import org.amdatu.core.fabric.cluster.internal.ClusterUtilities;
import
org.amdatu.core.fabric.cluster.service.tribes.TribesClusterMemberServiceImpl;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.DistributionService;
@@ -126,7 +126,7 @@
********************************************************/
public boolean createClusterChannel(String clusterChannelName,
Dictionary<String, Object> clusterChannelProperties) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ if (!ClusterUtilities.isValidChannelName(clusterChannelName)) {
m_logService.log(LogService.LOG_ERROR, "Failed to create
clusterchannel. Invalid clusterChannelName: "
+ clusterChannelName);
return false;
@@ -135,7 +135,7 @@
Component clusterMemberComponent =
m_dependencyManager
.createComponent()
- .setInterface(ClusterMemberService.class.getName(),
clusterChannelProperties)
+ .setInterface(ClusterChannelService.class.getName(),
clusterChannelProperties)
.setFactory(new
ClusterMemberServiceFactory(clusterChannelName, clusterChannelProperties),
"getInstance")
.add(
@@ -160,7 +160,7 @@
}
public boolean removeClusterChannel(String clusterGroupId) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
+ if (!ClusterUtilities.isValidChannelName(clusterGroupId)) {
m_logService.log(LogService.LOG_ERROR, "Failed to remove
clusterchannel. Invalid clusterChannelName: "
+ clusterGroupId);
return false;
@@ -180,13 +180,13 @@
}
public boolean createDiscovery(String clusterChannelName, String
serviceGroupName) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ if (!ClusterUtilities.isValidChannelName(clusterChannelName)) {
m_logService
.log(LogService.LOG_ERROR, "Failed to create discovery.
Invalid clusterChannelName: "
+ clusterChannelName);
return false;
}
- if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ if (!ClusterUtilities.isValidChannelName(serviceGroupName)) {
m_logService.log(LogService.LOG_ERROR, "Failed to create
discovery. Invalid serviceGroupName: "
+ serviceGroupName);
return false;
@@ -219,13 +219,13 @@
}
public boolean removeDiscovery(String clusterChannelName, String
serviceGroupName) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ if (!ClusterUtilities.isValidChannelName(clusterChannelName)) {
m_logService
.log(LogService.LOG_ERROR, "Failed to remove discovery.
Invalid clusterChannelName: "
+ clusterChannelName);
return false;
}
- if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ if (!ClusterUtilities.isValidChannelName(serviceGroupName)) {
m_logService.log(LogService.LOG_ERROR, "Failed to remove
discovery. Invalid serviceGroupName: "
+ serviceGroupName);
return false;
@@ -245,12 +245,12 @@
}
public boolean createDistribution(String clusterChannelName, String
serviceGroupName) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ if (!ClusterUtilities.isValidChannelName(clusterChannelName)) {
m_logService
.log(LogService.LOG_ERROR, "Failed to create distribution.
Invalid clustername: " + clusterChannelName);
return false;
}
- if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ if (!ClusterUtilities.isValidChannelName(serviceGroupName)) {
m_logService.log(LogService.LOG_ERROR, "Failed to remove
distribution. Invalid servicegroupname: "
+ serviceGroupName);
return false;
@@ -282,13 +282,13 @@
}
public boolean removeDistribution(String clusterChannelName, String
serviceGroupName) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ if (!ClusterUtilities.isValidChannelName(clusterChannelName)) {
m_logService
.log(LogService.LOG_ERROR, "Failed to create distribution.
Invalid clusterChannelname: "
+ clusterChannelName);
return false;
}
- if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ if (!ClusterUtilities.isValidChannelName(serviceGroupName)) {
m_logService.log(LogService.LOG_ERROR, "Failed to remove
distribution. Invalid serviceGroupName: "
+ serviceGroupName);
return false;
@@ -330,7 +330,7 @@
m_clusterChannelProperties = clusterChannelProperties;
}
- public ClusterMemberService getInstance() {
+ public ClusterChannelService getInstance() {
return new TribesClusterMemberServiceImpl(m_clusterChannelName,
m_clusterChannelProperties);
}
}
Modified:
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
(original)
+++
sandbox/bdekruijff/fabric/src/test/java/org/amdatu/core/fabric/remote/service/ChannelMemberUtilitiesTest.java
Tue Jan 11 10:49:48 2011
@@ -16,7 +16,7 @@
*/
package org.amdatu.core.fabric.remote.service;
-import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.cluster.internal.ClusterUtilities;
import org.junit.Assert;
import org.junit.Test;
@@ -24,20 +24,20 @@
@Test
public void testIsValidChannelName() {
- Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("a"));
- Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello"));
-
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello_world"));
-
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello-world"));
-
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("hello_123"));
-
Assert.assertTrue(ClusterMemberUtilities.isValidChannelName("123hello_"));
+ Assert.assertTrue(ClusterUtilities.isValidChannelName("a"));
+ Assert.assertTrue(ClusterUtilities.isValidChannelName("hello"));
+ Assert.assertTrue(ClusterUtilities.isValidChannelName("hello_world"));
+ Assert.assertTrue(ClusterUtilities.isValidChannelName("hello-world"));
+ Assert.assertTrue(ClusterUtilities.isValidChannelName("hello_123"));
+ Assert.assertTrue(ClusterUtilities.isValidChannelName("123hello_"));
- Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(null));
- Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(""));
- Assert.assertFalse(ClusterMemberUtilities.isValidChannelName(" "));
- Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("
hello"));
- Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello
"));
-
Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello.world"));
-
Assert.assertFalse(ClusterMemberUtilities.isValidChannelName("hello/world"));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName(null));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName(""));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName(" "));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName(" hello"));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName("hello "));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName("hello.world"));
+ Assert.assertFalse(ClusterUtilities.isValidChannelName("hello/world"));
}
}