Author: bdekruijff at gmail.com
Date: Tue Jan 11 10:25:35 2011
New Revision: 582
Log:
[sandbox] mcast disabling. support / static member support / member events /
more routing / robustness and fixes
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/NoMcastMembershipServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/StaticMemberMonitorInterceptor.java
Removed:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelCreator.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/IntrospectionUtils.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/FabricManagerService.java
Tue Jan 11 10:25:35 2011
@@ -6,9 +6,9 @@
String CONFIGURATION_PID = "org.amdatu.core.fabric";
- boolean createClusterMember(String clusterGroupId, String clusterMemberId,
Dictionary<String, Object> properties);
+ boolean createClusterChannel(String clusterGroupId, Dictionary<String,
Object> properties);
- boolean removeClusterMember(String clusterGroupId, String clusterMemberId);
+ boolean removeClusterChannel(String clusterGroupId);
boolean createDiscovery(String clusterGroupId, String serviceGroupId);
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberAddedEvent.java
Tue Jan 11 10:25:35 2011
@@ -0,0 +1,22 @@
+package org.amdatu.core.fabric.cluster;
+
+public class MemberAddedEvent {
+
+ private final String m_memberId;
+
+ public MemberAddedEvent(final String memberId) {
+ m_memberId = memberId;
+ }
+
+ public String getMemberId() {
+ return m_memberId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder(MemberAddedEvent.class.getSimpleName() + "{");
+ sb.append("\n\tmemberId: " + m_memberId);
+ sb.append("\n}");
+ return sb.toString();
+ }
+}
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/MemberRemovedEvent.java
Tue Jan 11 10:25:35 2011
@@ -0,0 +1,22 @@
+package org.amdatu.core.fabric.cluster;
+
+public class MemberRemovedEvent {
+
+ private final String m_memberId;
+
+ public MemberRemovedEvent(final String memberId) {
+ m_memberId = memberId;
+ }
+
+ public String getMemberId() {
+ return m_memberId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder(MemberRemovedEvent.class.getSimpleName() + "{");
+ sb.append("\n\tmemberId: " + m_memberId);
+ sb.append("\n}");
+ return sb.toString();
+ }
+}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/RoutableMessage.java
Tue Jan 11 10:25:35 2011
@@ -97,4 +97,17 @@
public final void setTargetServiceGroup(String serviceGroup) {
m_targetServiceGroup = serviceGroup;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("RoutableMessage{");
+ sb.append("\n\toriginClusterId=" + m_originClusterId);
+ sb.append("\n\toriginMemberId=" + m_originMemberId);
+ sb.append("\n\toriginServiceGroup=" + m_originServiceGroup);
+ sb.append("\n\ttargetClusterId=" + m_targetClusterId);
+ sb.append("\n\ttargetMemberId=" + m_targetMemberId);
+ sb.append("\n\ttargetServiceGroup=" + m_targetServiceGroup);
+ sb.append("\n}");
+ return sb.toString();
+ }
}
\ No newline at end of file
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/ChannelBuilder.java
Tue Jan 11 10:25:35 2011
@@ -0,0 +1,220 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal.tribes;
+
+import java.io.IOException;
+import java.util.Dictionary;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.membership.McastService;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.transport.bio.PooledMultiSender;
+import org.apache.catalina.tribes.transport.nio.NioReceiver;
+
+/**
+ * Build a Apache Tribes based cluster channel specific to Amdatu clusters.
+ *
+ * @see
http://svn.apache.org/repos/asf/tomcat/tc7.0.x/tags/TOMCAT_7_0_5/test/org/apache/catalina/tribes/demos/ChannelCreator.java
+ *
+ */
+public class ChannelBuilder {
+
+ /**
+ * Type of membership discovery. Default is "mcast" for dynamic discovery
and
+ * optional additional static members. To disable multicast use "static".
+ */
+ public static final String PROP_MEMBERSHIP = "membership";
+ public static final String PROP_MEMBERSHIP_DEFAULT = "mcast";
+
+ /**
+ * Comma separated list of static members that must be represented in the
form
+ * hostname:port.
+ */
+ public static final String PROP_STATICMEMBERS = "static";
+ public static final String PROP_STATICMEMBERS_DEFAULT = "";
+
+ /**
+ * Receiver component hostname. Default is "auto".
+ */
+ public static final String PROP_TCPBIND = "tcpbind";
+ public static final String PROP_TCPBIND_DEFAULT = "auto";
+
+ /**
+ * Receiver component port number. Default is "4001".
+ */
+ public static final String PROP_TCPPORT = "tcpport";
+ public static final int PROP_TCPPORT_DEFAULT = 4001;
+
+ /**
+ * Receiver component selector timeout in milliseconds for inactive
+ * connections. Default is "5000".
+ */
+ public static final String PROP_TCPSELTO = "tcpselo";
+ public static final int PROP_TCPSELTO_DEFAULT = 5000;
+
+ /**
+ * Receiver component number of threads. Default is "4".
+ */
+ public static final String PROP_TCPTHREADS = "tcpthreads";
+ public static final int PROP_TCPTHREADS_DEFAULT = 4;
+
+ /**
+ * Sender component handshake timeout in milliseconds for inactive
connections.
+ * Default is "500".
+ */
+ public static final String PROP_TCKACKTO = "tcpackto";
+ public static final int PROP_TCPACKTO_DEFAULT = 500;
+
+ /**
+ * Membership component multicast address (class D). Default is
"228.0.0.5".
+ */
+ public static final String PROP_MCASTADDR = "maddr";
+ public static final String PROP_MCASTADDR_DEFAULT = "228.0.0.5";
+
+ /**
+ * Membership component multicast port. Default value is "45565".
+ */
+ public static final String PROP_MCASTPORT = "mport";
+ public static final int PROP_MCASTPORT_DEFAULT = 45565;
+
+ /**
+ * Membership component hostname. Default value is "".
+ */
+ public static final String PROP_MCASTBIND = "mbind";
+ public static final String PROP_MCASTBIND_DEFAULT = "";
+
+ /**
+ * Membership component frequency of discovery requests in milliseconds.
+ * Default value is "500".
+ */
+ public static final String PROP_MCASTFREQ = "freq";
+ public static final int PROP_MCASTFREQ_DEFAULT = 500;
+
+ /**
+ * Membership component timeout for discovery requests in milliseconds.
+ * Default value is "2000".
+ */
+ public static final String PROP_MCASTDROP = "mdrop";
+ public static final int PROP_MCASTDROP_DEFAULT = 2000;
+
+ private static final String DOMAIN_ENCODING = "ISO-8859-1";
+
+ public static Channel createChannel(String channelName, Dictionary<String,
Object> options) throws Exception {
+
+ byte[] domain = channelName.getBytes(DOMAIN_ENCODING);
+
+ NioReceiver rx = new NioReceiver();
+ rx.setAddress(checkStringProperty(options.get(PROP_TCPBIND),
PROP_TCPBIND_DEFAULT));
+ rx.setPort(checkIntegerProperty(options.get(PROP_TCPPORT),
PROP_TCPPORT_DEFAULT));
+ rx.setSelectorTimeout(checkIntegerProperty(options.get(PROP_TCPSELTO),
PROP_TCPSELTO_DEFAULT));
+ rx.setMaxThreads(checkIntegerProperty(options.get(PROP_TCPTHREADS),
PROP_TCPTHREADS_DEFAULT));
+ rx.setMinThreads(checkIntegerProperty(options.get(PROP_TCPTHREADS),
PROP_TCPTHREADS_DEFAULT));
+ rx.getBind();
+ rx.setRxBufSize(43800);
+ rx.setTxBufSize(25188);
+ rx.setAutoBind(0);
+
+ ReplicationTransmitter tx = new ReplicationTransmitter();
+ PooledMultiSender sender = new PooledMultiSender();
+ sender.setTimeout(checkIntegerProperty(options.get(PROP_TCKACKTO),
PROP_TCPACKTO_DEFAULT));
+ sender.setMaxRetryAttempts(2);
+ sender.setRxBufSize(43800);
+ sender.setTxBufSize(25188);
+ tx.setTransport(sender);
+
+ String membership = checkStringProperty(options.get(PROP_MEMBERSHIP),
PROP_MEMBERSHIP_DEFAULT);
+ MembershipService ms;
+ if (membership.equals("static")) {
+ ms = new NoMcastMembershipServiceImpl();
+ ms.setDomain(domain);
+ }
+ else {
+ McastService mc = new McastService();
+ mc.setAddress(checkStringProperty(options.get(PROP_MCASTADDR),
PROP_MCASTADDR_DEFAULT));
+ if (checkStringProperty(options.get(PROP_MCASTBIND),
PROP_MCASTBIND_DEFAULT).equals(""))
+
mc.setMcastBindAddress(checkStringProperty(options.get(PROP_MCASTBIND),
PROP_MCASTBIND_DEFAULT));
+ mc.setFrequency(checkIntegerProperty(options.get(PROP_MCASTFREQ),
PROP_MCASTFREQ_DEFAULT));
+
mc.setMcastDropTime(checkIntegerProperty(options.get(PROP_MCASTDROP),
PROP_MCASTDROP_DEFAULT));
+ mc.setPort(checkIntegerProperty(options.get(PROP_MCASTPORT),
PROP_MCASTPORT_DEFAULT));
+ mc.setDomain(domain);
+ ms = mc;
+ }
+
+ ManagedChannel channel = new GroupChannel();
+ channel.setChannelReceiver(rx);
+ channel.setChannelSender(tx);
+ channel.setMembershipService(ms);
+
+ Member[] staticMembers =
+
memberArrayFromString(checkStringProperty(options.get(PROP_STATICMEMBERS),
PROP_STATICMEMBERS_DEFAULT),
+ domain);
+ if (staticMembers != null && staticMembers.length > 0) {
+ StaticMemberMonitorInterceptor smi = new
StaticMemberMonitorInterceptor();
+ for (int i = 0; i < staticMembers.length; i++) {
+ smi.addStaticMember(staticMembers[i]);
+ }
+ channel.addInterceptor(smi);
+ }
+ return channel;
+ }
+
+ private static String checkStringProperty(Object property, String
defaultValue) {
+ if (property == null || !(property instanceof String) ||
property.equals(""))
+ return defaultValue;
+ return (String) property;
+ }
+
+ private static int checkIntegerProperty(Object property, int defaultValue)
{
+ if (property != null) {
+ if (property instanceof String) {
+ try {
+ return Integer.parseInt((String) property);
+ }
+ catch (NumberFormatException e) {}
+ }
+ else {
+ if (property instanceof Integer) {
+ return ((Integer) property).intValue();
+
+ }
+ }
+ }
+ return defaultValue;
+ }
+
+ private static Member[] memberArrayFromString(String property, byte[]
domain) throws IOException {
+ Member[] members = null;
+ if (property != null && !property.equals("")) {
+ String[] memberparts = property.split(",");
+ members = new Member[memberparts.length];
+ for (int i = 0; i < memberparts.length; i++) {
+ String[] hostporttuples = memberparts[i].split(":");
+ MemberImpl memberimpl = new MemberImpl();
+ memberimpl.setHostname(hostporttuples[0]);
+ memberimpl.setPort(Integer.parseInt(hostporttuples[1]));
+ memberimpl.setDomain(domain);
+ members[i] = memberimpl;
+ }
+ }
+ return members;
+ }
+}
\ No newline at end of file
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/NoMcastMembershipServiceImpl.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/NoMcastMembershipServiceImpl.java
Tue Jan 11 10:25:35 2011
@@ -0,0 +1,163 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal.tribes;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+
+/**
+ * Alternative to MCastService doing next to nothing for static
+ * membership only scenario. This actually does nothing besides
+ * managing the local member.
+ *
+ * @see StaticMemberMonitorInterceptor
+ *
+ */
+public class NoMcastMembershipServiceImpl implements MembershipService {
+
+ protected static final Member[] EMPTY_MEMBERS = new Member[0];
+ protected static final String[] EMPTY_MEMBERNAMES = new String[0];
+
+ protected MembershipListener m_membershipListener;
+
+ protected long m_serviceStartTime;
+
+ protected Properties m_properties = new Properties();
+
+ protected MemberImpl m_localMember;
+ protected Member[] m_members;
+
+ protected byte[] m_payload;
+
+ protected byte[] m_domain;
+
+ public Member findMemberByName(String name) {
+ return null;
+ }
+
+ public byte[] getPayload() {
+ return m_payload;
+ }
+
+ public byte[] getDomain() {
+ return m_domain;
+ }
+
+ public Member getLocalMember(boolean alive) {
+ if (alive && m_localMember != null) {
+ m_localMember.setMemberAliveTime(System.currentTimeMillis() -
m_serviceStartTime);
+ }
+ return m_localMember;
+ }
+
+ public Member getMember(Member member) {
+ return null;
+ }
+
+ public Member[] getMembers() {
+ return EMPTY_MEMBERS;
+ }
+
+ public String[] getMembersByName() {
+ return EMPTY_MEMBERNAMES;
+ }
+
+ public Properties getProperties() {
+ return m_properties;
+ }
+
+ public boolean hasMembers() {
+ return false;
+ }
+
+ public void removeMembershipListener() {
+ m_membershipListener = null;
+ }
+
+ public void setDomain(byte[] domain) {
+ m_domain = domain;
+ }
+
+ public void setLocalMemberProperties(String listenHost, int listenPort) {
+ m_properties.setProperty("tcpListenHost", listenHost);
+ m_properties.setProperty("tcpListenPort", String.valueOf(listenPort));
+ try {
+ if (m_localMember != null) {
+ m_localMember.setHostname(listenHost);
+ m_localMember.setPort(listenPort);
+ }
+ else {
+ m_localMember = new MemberImpl(listenHost, listenPort, 0);
+ m_localMember.setUniqueId(UUIDGenerator.randomUUID(true));
+ m_localMember.setPayload(getPayload());
+ m_localMember.setDomain(getDomain());
+ }
+ m_localMember.getData(true, true);
+ }
+ catch (IOException x) {
+ throw new IllegalArgumentException(x);
+ }
+ }
+
+ public void setMembershipListener(MembershipListener membershipListener) {
+ m_membershipListener = membershipListener;
+ }
+
+ public void setPayload(byte[] payload) {
+ m_payload = payload;
+ }
+
+ public void setProperties(Properties properties) {
+ this.m_properties = properties;
+ }
+
+ public void start() throws Exception {
+ start(0);
+ }
+
+ public void start(int arg0) throws Exception {
+ String host = getProperties().getProperty("tcpListenHost");
+ int port =
Integer.parseInt(getProperties().getProperty("tcpListenPort"));
+ if (m_localMember == null) {
+ m_localMember = new MemberImpl(host, port, 100);
+ m_localMember.setUniqueId(UUIDGenerator.randomUUID(true));
+ }
+ else {
+ m_localMember.setHostname(host);
+ m_localMember.setPort(port);
+ m_localMember.setMemberAliveTime(100);
+ }
+ m_serviceStartTime = System.currentTimeMillis();
+ }
+
+ public void stop(int arg0) {
+ }
+
+ public void memberAdded(Member member) {
+ m_membershipListener.memberAdded(member);
+ }
+
+ public void memberDisappeared(Member member) {
+ m_membershipListener.memberDisappeared(member);
+ }
+}
Added:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/StaticMemberMonitorInterceptor.java
==============================================================================
--- (empty file)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/internal/tribes/StaticMemberMonitorInterceptor.java
Tue Jan 11 10:25:35 2011
@@ -0,0 +1,462 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.core.fabric.cluster.internal.tribes;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.Arrays;
+
+/**
+ * Tribes interceptor that actively monitors the availability and state of
configured static
+ * cluster members. This is NOT implemented as a MembershipService so that it
can operate
+ * in conjunction with the MCastMebershipService.
+ *
+ * TODO bridge to LogService
+ * TODO make timeouts configurable
+ */
+public final class StaticMemberMonitorInterceptor extends
ChannelInterceptorBase {
+
+ public static final byte[] TCP_HEARTBEAT = new byte[] {
+ 85, -10, -108, -73, 58, -6, 64, 120, -111, 4, 125, -41, 114, -124,
-64, -43 };
+
+ public static final byte[] STARTUP_PAYLOAD = new byte[] { 67, 65, 66, 89,
45, 65, 76, 69, 88 };
+ public static final byte[] SHUTDOWN_PAYLOAD = new byte[] { 66, 65, 66, 89,
45, 65, 76, 69, 88 };
+
+ private static final long ALIVE_HEARTBEAT_INTERVAL = 5000;
+ private static final long ALIVE_TIMEOUT_INTERVAL = 10000;
+
+ private final ConcurrentHashMap<StaticMemberKey, StaticMemberWrapper>
m_configuredMembers =
+ new ConcurrentHashMap<StaticMemberKey, StaticMemberWrapper>();
+
+ private final LinkedBlockingQueue<Member> m_remoteHeartbeats = new
LinkedBlockingQueue<Member>();
+
+ private Thread m_stateMonitor;
+ private volatile boolean m_runStateMonitor;
+
+ public StaticMemberMonitorInterceptor() {
+ super();
+ }
+
+ public void addStaticMember(Member member) {
+ if (member == null)
+ return;
+ StaticMemberKey key = StaticMemberKey.get(member);
+ m_configuredMembers.putIfAbsent(key, new StaticMemberWrapper(key,
member));
+ }
+
+ public void removeStaticMember(Member member) {
+ if (member == null)
+ return;
+ StaticMemberKey key = StaticMemberKey.get(member);
+ StaticMemberWrapper wrapper = m_configuredMembers.remove(key);
+ if (wrapper.isActive()) {
+ memberDisappeared(wrapper.getRemoteMember());
+ }
+ }
+
+ @Override
+ public void start(int svc) throws ChannelException {
+ for (Entry<StaticMemberKey, StaticMemberWrapper> entry :
m_configuredMembers.entrySet()) {
+ StaticMemberWrapper wrapper = entry.getValue();
+ sendHeartbeatMessage(wrapper.getConfiguredMember(),
STARTUP_PAYLOAD);
+ }
+ startStateMonitor();
+ super.start(svc);
+ }
+
+ @Override
+ public void stop(int svc) throws ChannelException {
+ stopStateMonitor();
+ for (Entry<StaticMemberKey, StaticMemberWrapper> entry :
m_configuredMembers.entrySet()) {
+ StaticMemberWrapper wrapper = entry.getValue();
+ sendHeartbeatMessage(wrapper.getConfiguredMember(),
SHUTDOWN_PAYLOAD);
+ }
+ super.stop(svc);
+ }
+
+ @Override
+ public synchronized void heartbeat() {
+ super.heartbeat();
+ if (m_runStateMonitor)
+ updateConfiguredMemberStates();
+ }
+
+ @Override
+ public void messageReceived(ChannelMessage msg) {
+ if (isHeartBeatMessage(msg.getMessage().getBytes())) {
+
+ try {
+ Member remoteMember =
MemberImpl.getMember(msg.getMessage().getBytes(), TCP_HEARTBEAT.length,
+ msg.getMessage().getLength() - TCP_HEARTBEAT.length);
+ m_remoteHeartbeats.offer(remoteMember);
+ return;
+ }
+ catch (IllegalArgumentException e) {
+ // Thrown by MemberImpl if something is wrong with the bytes.
It probably indicates that our
+ // HEARTBEAT prefix is not unique enough and needs attention.
We pass it on as a regular
+ // message as a fallback.
+ e.printStackTrace();
+ }
+ }
+ super.messageReceived(msg);
+ }
+
+ @Override
+ public void memberAdded(Member member) {
+ System.err.println("Added: " + member.toString());
+ super.memberAdded(member);
+ }
+
+ @Override
+ public void memberDisappeared(Member member) {
+ System.err.println("Removed: " + member.toString());
+ super.memberDisappeared(member);
+ }
+
+ private void startStateMonitor() {
+ m_runStateMonitor = true;
+ m_stateMonitor = new Thread(new Runnable() {
+ public void run() {
+ while (m_runStateMonitor) {
+ processHeartbeats();
+ }
+ }
+ });
+ m_stateMonitor.setDaemon(true);
+ m_stateMonitor.setPriority(Thread.MAX_PRIORITY);
+ m_stateMonitor.start();
+ }
+
+ private void stopStateMonitor() {
+ m_runStateMonitor = false;
+ }
+
+ private void processHeartbeats() {
+ try {
+ Member remoteMember = m_remoteHeartbeats.take();
+ long currentTimeMillis = System.currentTimeMillis();
+ StaticMemberKey key = StaticMemberKey.get(remoteMember);
+ StaticMemberWrapper wrapper = m_configuredMembers.get(key);
+ if (wrapper == null) {
+ return;
+ }
+ processHeartbeat(wrapper, remoteMember, currentTimeMillis);
+ }
+ catch (InterruptedException e) {}
+ }
+
+ private void processHeartbeat(StaticMemberWrapper wrapper, Member
remoteMember, long currentTimeMillis) {
+ if (!Arrays.equals(getLocalMember(false).getDomain(),
remoteMember.getDomain())) {
+ // member from other domain
+ return;
+ }
+ if (Arrays.equals(remoteMember.getCommand(), STARTUP_PAYLOAD)) {
+ // member startup
+ wrapper.reset();
+ wrapper.setRemoteMember(remoteMember);
+ wrapper.updateLastHeartbeat(currentTimeMillis);
+ if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+ wrapper.updateLastReached(currentTimeMillis);
+ wrapper.setActive(true);
+ memberAdded(remoteMember);
+ }
+ return;
+ }
+ if (Arrays.equals(remoteMember.getCommand(), SHUTDOWN_PAYLOAD)) {
+ // member shutdown
+ if (wrapper.isActive()) {
+ wrapper.reset();
+ memberDisappeared(remoteMember);
+ }
+ return;
+ }
+ if (wrapper.getRemoteMember() == null) {
+ // member appears
+ wrapper.reset();
+ wrapper.setRemoteMember(remoteMember);
+ wrapper.updateLastHeartbeat(currentTimeMillis);
+ if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+ wrapper.updateLastReached(currentTimeMillis);
+ wrapper.setActive(true);
+ memberAdded(remoteMember);
+ }
+ return;
+ }
+ if (!Arrays.equals(wrapper.getRemoteMember().getUniqueId(),
remoteMember.getUniqueId())) {
+ // member changes
+ memberDisappeared(remoteMember);
+ wrapper.reset();
+ wrapper.setRemoteMember(remoteMember);
+ wrapper.updateLastHeartbeat(currentTimeMillis);
+ if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+ wrapper.updateLastReached(currentTimeMillis);
+ wrapper.setActive(true);
+ memberAdded(remoteMember);
+ }
+ return;
+ }
+ if (!wrapper.isActive()) {
+ // member inactive
+ wrapper.updateLastHeartbeat(currentTimeMillis);
+ if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+ wrapper.updateLastReached(currentTimeMillis);
+ wrapper.setActive(true);
+ memberAdded(remoteMember);
+ }
+ return;
+ }
+ // all is well
+ wrapper.updateLastHeartbeat(currentTimeMillis);
+ }
+
+ private void updateConfiguredMemberStates() {
+ long currentTimeMillis = System.currentTimeMillis();
+ for (Entry<StaticMemberKey, StaticMemberWrapper> entry :
m_configuredMembers.entrySet()) {
+ StaticMemberWrapper wrapper = entry.getValue();
+ updateConfiguredMemberState(currentTimeMillis, wrapper);
+ }
+ }
+
+ private void updateConfiguredMemberState(long currentTimeMillis,
StaticMemberWrapper wrapper) {
+ if (wrapper.isActive()) {
+ // should not happen
+ if (wrapper.getRemoteMember() == null) {
+ wrapper.reset();
+ return;
+ }
+ // member silent
+ if ((currentTimeMillis - wrapper.getLastHeartbeat()) >
ALIVE_TIMEOUT_INTERVAL) {
+ memberDisappeared(wrapper.getRemoteMember());
+ wrapper.reset();
+ return;
+ }
+ // member stale
+ if ((currentTimeMillis - wrapper.getLastReached()) >
ALIVE_HEARTBEAT_INTERVAL) {
+ if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+ wrapper.updateLastReached(currentTimeMillis);
+ }
+ else {
+ memberDisappeared(wrapper.getRemoteMember());
+ wrapper.reset();
+ }
+ }
+ return;
+ }
+ // wrapper is inactive
+ if ((currentTimeMillis - wrapper.getLastReached()) >
ALIVE_HEARTBEAT_INTERVAL) {
+ if (sendHeartbeatMessage(wrapper.getConfiguredMember())) {
+ wrapper.updateLastReached(currentTimeMillis);
+ }
+ }
+ // member revived
+ if ((currentTimeMillis - wrapper.getLastHeartbeat()) <
ALIVE_TIMEOUT_INTERVAL
+ && (currentTimeMillis - wrapper.getLastReached() <
ALIVE_HEARTBEAT_INTERVAL)) {
+ wrapper.setActive(true);
+ memberAdded(wrapper.getRemoteMember());
+ }
+ }
+
+ private boolean sendHeartbeatMessage(Member member) {
+ return sendHeartbeatMessage(member, null);
+ }
+
+ private boolean sendHeartbeatMessage(Member member, byte[] command) {
+ Socket socket = new Socket();
+ try {
+ InetAddress ia = InetAddress.getByAddress(member.getHost());
+ InetSocketAddress addr = new InetSocketAddress(ia,
member.getPort());
+ socket.setSoTimeout((int) 100);
+ socket.connect(addr, (int) 100);
+ ChannelData data = new ChannelData(true);
+ data.setAddress(member);
+
+ XByteBuffer mbrBuf = new XByteBuffer(TCP_HEARTBEAT, false);
+ MemberImpl localMember = (MemberImpl) getLocalMember(false);
+ localMember.setCommand(command);
+ byte[] mbrData = localMember.getData();
+ mbrBuf.append(mbrData, 0, mbrData.length);
+ data.setMessage(mbrBuf);
+ data.setTimestamp(System.currentTimeMillis());
+ int options = optionFlag | Channel.SEND_OPTIONS_BYTE_MESSAGE;
+ options = (options | Channel.SEND_OPTIONS_USE_ACK);
+ data.setOptions(options);
+ byte[] message = XByteBuffer.createDataPackage(data);
+ socket.getOutputStream().write(message);
+ int length = socket.getInputStream().read(message);
+ return length > 0;
+ }
+ catch (SocketTimeoutException sx) {}
+ catch (ConnectException cx) {}
+ catch (Exception x) {}
+ finally {
+ try {
+ socket.close();
+
+ }
+ catch (Exception ignore) {}
+ }
+ return false;
+ }
+
+ private boolean isHeartBeatMessage(byte[] bytes) {
+ if (bytes == null || bytes.length < TCP_HEARTBEAT.length)
+ return false;
+ for (int i = 0; i < TCP_HEARTBEAT.length; i++) {
+ if (bytes[i] != TCP_HEARTBEAT[i])
+ return false;
+ }
+ return true;
+ }
+
+ static class StaticMemberWrapper {
+
+ private final StaticMemberKey m_key;
+ private final Member m_configuredMember;
+
+ private Member m_remoteMember;
+ private Member m_updateMember;
+
+ private boolean m_active = false;
+
+ private long m_lastHeartbeat = 0;
+ private long m_lastReached = 0;
+
+ public StaticMemberWrapper(StaticMemberKey key, Member
configuredMember) {
+ m_key = key;
+ m_configuredMember = configuredMember;
+ }
+
+ public StaticMemberKey getKey() {
+ return m_key;
+ }
+
+ public Member getConfiguredMember() {
+ return m_configuredMember;
+ }
+
+ public Member getRemoteMember() {
+ return m_remoteMember;
+ }
+
+ public void setRemoteMember(Member remoteMember) {
+ m_remoteMember = remoteMember;
+ }
+
+ public Member getUpdateMember() {
+ return m_updateMember;
+ }
+
+ public void setUpdateMember(Member updateMember) {
+ m_updateMember = updateMember;
+ }
+
+ public void updateLastHeartbeat(long now) {
+ m_lastHeartbeat = now;
+ }
+
+ public long getLastHeartbeat() {
+ return m_lastHeartbeat;
+ }
+
+ public void updateLastReached(long now) {
+ m_lastReached = now;
+ }
+
+ public long getLastReached() {
+ return m_lastReached;
+ }
+
+ public boolean isActive() {
+ return m_active;
+ }
+
+ public void setActive(boolean state) {
+ m_active = state;
+ }
+
+ public void reset() {
+ m_active = false;
+ m_remoteMember = null;
+ m_updateMember = null;
+ m_lastHeartbeat = 0;
+ m_lastReached = 0;
+ }
+ }
+
+ static class StaticMemberKey {
+
+ private final static Map<Member, StaticMemberKey> KEY_CACHE = new
HashMap<Member, StaticMemberKey>();
+
+ public static StaticMemberKey get(Member member) {
+ StaticMemberKey key = KEY_CACHE.get(member);
+ if (key == null) {
+ key = new StaticMemberKey(member);
+ KEY_CACHE.put(member, key);
+ }
+ return key;
+ }
+
+ private final byte[] m_host;
+ private final int m_port;
+
+ public StaticMemberKey(Member member) {
+ m_host = member.getHost();
+ m_port = member.getPort();
+ }
+
+ public StaticMemberKey(byte[] hostname, int port) {
+ m_host = hostname;
+ m_port = port;
+ }
+
+ public byte[] getHostName() {
+ return m_host;
+ }
+
+ public int getPort() {
+ return m_port;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return ((StaticMemberKey) obj).getPort() == m_port
+ && Arrays.equals(((StaticMemberKey) obj).getHostName(),
m_host);
+ }
+
+ @Override
+ public int hashCode() {
+ return m_host[0] + m_host[1] + m_host[2] + m_host[3] + m_port;
+ }
+ }
+}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/ClusterMemberServiceBase.java
Tue Jan 11 10:25:35 2011
@@ -21,13 +21,17 @@
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMember;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.MemberAddedEvent;
+import org.amdatu.core.fabric.cluster.MemberRemovedEvent;
import org.amdatu.core.fabric.cluster.RoutableMessage;
import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
+import org.amdatu.core.fabric.remote.internal.EndpointInvokeMessage;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
import org.apache.felix.dm.ServiceDependency;
@@ -39,6 +43,8 @@
/**
* I manage cluster state
+ *
+ * TODO add a static(?) getConfigurationProperties method
*/
public abstract class ClusterMemberServiceBase implements ClusterMemberService
{
@@ -46,28 +52,26 @@
private final ReentrantReadWriteLock m_clusterMembersLock = new
ReentrantReadWriteLock();
private final String m_clusterId;
- private final String m_memberId;
private final Dictionary<String, Object> m_properties;
private final String m_recieveEventTopic;
private final String m_sendEventTopic;
- // injected
private volatile DependencyManager m_dependencyManager;
private volatile Component m_component;
private volatile EventAdmin m_eventAdmin;
private volatile LogService m_logService;
private volatile Component m_broadcastEventHandlerComponent;
+ private volatile String m_memberId;
/********************************************************
* Constructors
********************************************************/
- public ClusterMemberServiceBase(String clusterGroupId, String
clusterMemberId,
- Dictionary<String, Object> properties) {
+ public ClusterMemberServiceBase(String clusterGroupId, Dictionary<String,
Object> properties) {
m_clusterId = clusterGroupId;
- m_memberId = clusterMemberId;
+ m_memberId = UUID.randomUUID().toString();
m_properties = new Hashtable<String, Object>();
if (properties != null) {
Enumeration<String> enumeration = properties.keys();
@@ -85,35 +89,10 @@
********************************************************/
public final synchronized void init() {
-
- @SuppressWarnings("unchecked")
- Dictionary<String, Object> serviceProps =
m_component.getServiceProperties();
- if (serviceProps == null) {
- serviceProps = new Hashtable<String, Object>();
- }
-
- serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterId);
- serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
m_memberId);
- serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY,
m_properties);
- m_component.setServiceProperties(serviceProps);
-
- ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
- logServiceDependency.setService(LogService.class);
- logServiceDependency.setRequired(true);
- logServiceDependency.setInstanceBound(true);
- m_component.add(logServiceDependency);
-
- ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
- eventAdminServiceDependency.setService(EventAdmin.class);
- eventAdminServiceDependency.setRequired(true);
- eventAdminServiceDependency.setInstanceBound(true);
- m_component.add(eventAdminServiceDependency);
-
- Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(EventConstants.EVENT_TOPIC, new String[] { m_sendEventTopic
});
- m_broadcastEventHandlerComponent =
m_dependencyManager.createComponent();
-
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(),
props);
- m_broadcastEventHandlerComponent.setImplementation(new
BroadcastEventHandler());
+ updateServiceProperties();
+ initLogServiceDependency();
+ initEventAdminDependency();
+ initBroadcastEventHandlerComponent();
onInit();
}
@@ -122,15 +101,15 @@
}
public final synchronized void start() {
- m_logService.log(LogService.LOG_WARNING, "Starting
ClusterMemberService");
m_dependencyManager.add(m_broadcastEventHandlerComponent);
onStart();
+ m_logService.log(LogService.LOG_WARNING, "Started ClusterChannel: " +
m_clusterId + "/" + m_memberId);
}
public final synchronized void stop() {
- m_logService.log(LogService.LOG_WARNING, "Stopping
ClusterMemberService");
m_dependencyManager.remove(m_broadcastEventHandlerComponent);
onStop();
+ m_logService.log(LogService.LOG_WARNING, "Stopped ClusterChannel: " +
m_clusterId + "/" + m_memberId);
}
/********************************************************
@@ -149,6 +128,11 @@
return m_memberId;
}
+ protected final void setMemberId(String memberId) {
+ m_memberId = memberId;
+ updateServiceProperties();
+ }
+
protected final Dictionary<String, Object> getProperties() {
return m_properties;
@@ -182,6 +166,10 @@
finally {
m_clusterMembersLock.writeLock().unlock();
}
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EVENT_MESSAGE_PROPERTY, new
MemberAddedEvent(clusterMember.getMemberId()));
+ Event broadCastEvent = new Event(m_recieveEventTopic, props);
+ m_eventAdmin.postEvent(broadCastEvent);
}
protected final void removeClusterMember(String memberId) {
@@ -192,9 +180,13 @@
finally {
m_clusterMembersLock.writeLock().unlock();
}
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EVENT_MESSAGE_PROPERTY, new MemberRemovedEvent(memberId));
+ Event broadCastEvent = new Event(m_recieveEventTopic, props);
+ m_eventAdmin.postEvent(broadCastEvent);
}
- protected final void dispatchMessage(Object message) {
+ protected final void dispatchMessage(Object message, ClusterMember sender)
{
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(EVENT_MESSAGE_PROPERTY, message);
if (message instanceof LocalTopicMessage) {
@@ -224,46 +216,72 @@
protected abstract void doSend(ClusterMember[] clusterMember, Object
message);
/********************************************************
- * Helper classes
+ * Private methods
********************************************************/
- class BroadcastEventHandler implements EventHandler {
+ private void updateServiceProperties() {
+ @SuppressWarnings("unchecked")
+ Dictionary<String, Object> serviceProps =
m_component.getServiceProperties();
+ if (serviceProps == null)
+ serviceProps = new Hashtable<String, Object>();
+ serviceProps.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
m_clusterId);
+ serviceProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
m_memberId);
+ serviceProps.put(ClusterMemberService.SERVICE_CONFIGURATION_PROPERTY,
m_properties);
+ m_component.setServiceProperties(serviceProps);
+ }
- public void handleEvent(Event event) {
- Object message = event.getProperty(EVENT_MESSAGE_PROPERTY);
- if (message instanceof RoutableMessage) {
+ private void initBroadcastEventHandlerComponent() {
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventConstants.EVENT_TOPIC, new String[] { m_sendEventTopic
});
+ m_broadcastEventHandlerComponent =
m_dependencyManager.createComponent();
+
m_broadcastEventHandlerComponent.setInterface(EventHandler.class.getName(),
props);
+ m_broadcastEventHandlerComponent.setImplementation(new
BroadcastEventHandler());
+ }
+
+ private void initEventAdminDependency() {
+ ServiceDependency eventAdminServiceDependency =
m_dependencyManager.createServiceDependency();
+ eventAdminServiceDependency.setService(EventAdmin.class);
+ eventAdminServiceDependency.setRequired(true);
+ eventAdminServiceDependency.setInstanceBound(true);
+ m_component.add(eventAdminServiceDependency);
+ }
- RoutableMessage routableMessage = (RoutableMessage) message;
- routableMessage.setOriginClusterId(m_clusterId);
- routableMessage.setOriginMemberId(m_memberId);
+ private void initLogServiceDependency() {
+ ServiceDependency logServiceDependency =
m_dependencyManager.createServiceDependency();
+ logServiceDependency.setService(LogService.class);
+ logServiceDependency.setRequired(true);
+ logServiceDependency.setInstanceBound(true);
+ m_component.add(logServiceDependency);
+ }
- if (routableMessage.getTargetClusterId() == null) {
- routableMessage.setTargetClusterId(m_clusterId);
- }
- else {
- // FIXME address this
- if
(!routableMessage.getTargetClusterId().equals(getClusterId())) {
- m_logService.log(LogService.LOG_ERROR,
RoutableMessage.class.getSimpleName()
- + " is not for this cluster: " +
routableMessage.getTargetClusterId());
- }
- }
+ /********************************************************
+ * Helper classes
+ ********************************************************/
- ClusterMember clusterMember = null;
- if (routableMessage.getTargetMemberId() != null) {
- clusterMember =
getClusterMember(routableMessage.getTargetMemberId());
- if (clusterMember == null) {
- // FIXME address this
- m_logService.log(LogService.LOG_ERROR, "RoutedMessage
specifies unknown target member: "
- + routableMessage.getTargetMemberId());
- }
- }
+ class BroadcastEventHandler implements EventHandler {
- if (clusterMember != null) {
- doSend(new ClusterMember[] { clusterMember }, message);
+ public void handleEvent(Event event) {
+ Object message = event.getProperty(EVENT_MESSAGE_PROPERTY);
+ if (!(message instanceof RoutableMessage)) {
+ doBroadcast(message);
+ return;
+ }
+ RoutableMessage routableMessage = (RoutableMessage) message;
+ routableMessage.setOriginClusterId(m_clusterId);
+ routableMessage.setOriginMemberId(m_memberId);
+ routableMessage.setTargetClusterId(m_clusterId);
+ if (routableMessage.getTargetMemberId() != null) {
+ ClusterMember clusterMember =
getClusterMember(routableMessage.getTargetMemberId());
+ if (clusterMember == null) {
+ m_logService.log(LogService.LOG_ERROR, "RoutedMessage
specifies unknown target member: "
+ + routableMessage.toString());
return;
}
+ doSend(new ClusterMember[] { clusterMember }, message);
+ }
+ else {
+ doBroadcast(message);
}
- doBroadcast(message);
}
}
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/cluster/service/tribes/TribesClusterMemberServiceImpl.java
Tue Jan 11 10:25:35 2011
@@ -16,24 +16,20 @@
*/
package org.amdatu.core.fabric.cluster.service.tribes;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.Dictionary;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMember;
-import org.amdatu.core.fabric.cluster.ClusterMemberService;
import org.amdatu.core.fabric.cluster.internal.ClusterMemberImpl;
-import org.amdatu.core.fabric.cluster.internal.tribes.ChannelCreator;
+import org.amdatu.core.fabric.cluster.internal.tribes.ChannelBuilder;
import org.amdatu.core.fabric.cluster.service.ClusterMemberServiceBase;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
@@ -41,17 +37,16 @@
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.util.Arrays;
import org.osgi.service.log.LogService;
public final class TribesClusterMemberServiceImpl extends
ClusterMemberServiceBase {
public static final String CLUSTER_TRIBES_ARGS_PROP =
"org.amdatu.fabric.cluster.tribes.args";
- private final Map<String, Member> m_memberIdMembers = new HashMap<String,
Member>();
- private final ReentrantReadWriteLock m_memberIdMembersLock = new
ReentrantReadWriteLock();
-
- private final Set<Member> m_faultyMembers = new HashSet<Member>();
- private final ReentrantReadWriteLock m_faultyMembersLock = new
ReentrantReadWriteLock();
+ private final Map<String, Member> m_members = new HashMap<String,
Member>();
+ private final ReentrantReadWriteLock m_membersLock = new
ReentrantReadWriteLock();
+ private volatile Member[] m_membersA = new Member[0];
private volatile ManagedChannel m_managedChannel;
@@ -59,9 +54,9 @@
* Constructors
********************************************************/
- public TribesClusterMemberServiceImpl(String clusterGroupId, String
clusterMemberId,
+ public TribesClusterMemberServiceImpl(String clusterGroupId,
Dictionary<String, Object> properties) {
- super(clusterGroupId, clusterMemberId, properties);
+ super(clusterGroupId, properties);
}
/********************************************************
@@ -75,19 +70,23 @@
}
protected synchronized void onStart() {
+ getLogService().log(LogService.LOG_DEBUG, "Starting managed channel");
try {
- getLogService().log(LogService.LOG_DEBUG, "Starting managed
channel");
m_managedChannel =
- (ManagedChannel) ChannelCreator.createChannel((String[])
getProperties().get(
- CLUSTER_TRIBES_ARGS_PROP));
+ (ManagedChannel) ChannelBuilder
+ .createChannel(getClusterId(), getProperties());
Properties props = new Properties();
props.setProperty(SERVICE_CLUSTERCHANNEL_PROPERTY, getClusterId());
props.setProperty(SERVICE_CLUSTERMEMBER_PROPERTY, getMemberId());
+
+
m_managedChannel.getMembershipService().setPayload(getPayload(props));
m_managedChannel.addMembershipListener(new
TribesMembershipListener());
m_managedChannel.addChannelListener(new TribesChannelListener());
-
m_managedChannel.getMembershipService().setPayload(getPayload(props));
m_managedChannel.start(Channel.DEFAULT);
+
+ Member localMember =
m_managedChannel.getMembershipService().getLocalMember(false);
+ setMemberId(Arrays.toString(localMember.getUniqueId()));
}
catch (Exception e) {
getLogService().log(LogService.LOG_ERROR, "Exception while
starting managed channel", e);
@@ -95,8 +94,8 @@
}
protected synchronized void onStop() {
+ getLogService().log(LogService.LOG_DEBUG, "Stopping managed channel");
try {
- getLogService().log(LogService.LOG_DEBUG, "Stopping managed
channel");
m_managedChannel.stop(Channel.DEFAULT);
}
catch (Exception e) {
@@ -116,24 +115,14 @@
+ message.toString());
return;
}
- m_memberIdMembersLock.readLock().lock();
- try {
- if (m_memberIdMembers.size() == 0) {
- getLogService().log(LogService.LOG_WARNING,
+ Member[] members = m_membersA;
+ if (members.length == 0) {
+ getLogService().log(LogService.LOG_WARNING,
"Dropping message during send because there are no
active members on my channel: "
+ message.toString());
- return;
- }
- Member[] members = m_memberIdMembers.values().toArray(new
Member[m_memberIdMembers.size()]);
- m_managedChannel.send(members, (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
- }
- catch (ChannelException e) {
- getLogService().log(LogService.LOG_ERROR,
- "Exception during send on managed channel: " +
message.toString(), e);
- }
- finally {
- m_memberIdMembersLock.readLock().unlock();
+ return;
}
+ sendToMembers(message, members);
}
@Override
@@ -145,30 +134,27 @@
return;
}
List<Member> memberList = new LinkedList<Member>();
- m_memberIdMembersLock.readLock().lock();
+ m_membersLock.readLock().lock();
try {
for (ClusterMember clusterMember : clusterMembers) {
- Member member =
m_memberIdMembers.get(clusterMember.getMemberId());
+ Member member = m_members.get(clusterMember.getMemberId());
if (member != null) {
memberList.add(member);
}
}
- if (memberList.size() == 0) {
- getLogService().log(LogService.LOG_WARNING,
- "Dropping message during send because there are no
matching members on my channel: "
- + message.toString());
- return;
- }
- Member[] members = memberList.toArray(new
Member[memberList.size()]);
- m_managedChannel.send(members, (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
- }
- catch (ChannelException e) {
- getLogService().log(LogService.LOG_ERROR,
- "Exception during send on managed channel: " +
message.toString(), e);
}
finally {
- m_memberIdMembersLock.readLock().unlock();
+ m_membersLock.readLock().unlock();
+ }
+
+ if (memberList.size() == 0) {
+ getLogService().log(LogService.LOG_WARNING,
+ "Dropping message during send because there are no
matching members on my channel: "
+ + message.toString());
+ return;
}
+ Member[] members = memberList.toArray(new Member[memberList.size()]);
+ sendToMembers(message, members);
}
/********************************************************
@@ -181,53 +167,45 @@
return bout.toByteArray();
}
- private Properties getProperties(byte[] payload) throws IOException {
- ByteArrayInputStream bin = new ByteArrayInputStream(payload);
- Properties props = new Properties();
- props.load(bin);
- return props;
- }
-
- private void addMemberIdMember(String clusterMember, Member member) {
- m_memberIdMembersLock.writeLock().lock();
+ private void memberAdded(Member member) {
+ String memberId = getMemberId(member);
+ m_membersLock.writeLock().lock();
try {
- m_memberIdMembers.put(clusterMember, member);
+ m_members.put(memberId, member);
+ m_membersA = m_members.values().toArray(new
Member[m_members.size()]);
}
finally {
- m_memberIdMembersLock.writeLock().unlock();
+ m_membersLock.writeLock().unlock();
}
- addClusterMember(new ClusterMemberImpl(clusterMember));
+ super.addClusterMember(new ClusterMemberImpl(memberId));
}
- private void removeMemberIdMember(String clusterMember) {
- m_memberIdMembersLock.writeLock().lock();
+ private void memberDisappeared(Member member) {
+ String memberId = getMemberId(member);
+ m_membersLock.writeLock().lock();
try {
- m_memberIdMembers.remove(clusterMember);
+ m_members.remove(memberId);
+ m_membersA = m_members.values().toArray(new
Member[m_members.size()]);
}
finally {
- m_memberIdMembersLock.writeLock().unlock();
+ m_membersLock.writeLock().unlock();
}
- removeClusterMember(clusterMember);
+ super.removeClusterMember(memberId);
}
- private void addFaultyMember(Member member) {
- m_faultyMembersLock.writeLock().lock();
+ private void sendToMembers(Object message, Member[] members) {
try {
- m_faultyMembers.add(member);
+ m_managedChannel.send(members, (Serializable) message,
Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ getLogService().log(LogService.LOG_WARNING, "messageSend: " +
message.toString());
}
- finally {
- m_faultyMembersLock.writeLock().unlock();
+ catch (ChannelException e) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Exception during send on managed channel: " +
message.toString(), e);
}
}
- private boolean removeFaultyMember(Member member) {
- m_faultyMembersLock.writeLock().lock();
- try {
- return m_faultyMembers.remove(member);
- }
- finally {
- m_faultyMembersLock.writeLock().unlock();
- }
+ private String getMemberId(Member member) {
+ return Arrays.toString(member.getUniqueId());
}
/********************************************************
@@ -237,65 +215,31 @@
class TribesMembershipListener implements MembershipListener {
public void memberAdded(Member member) {
- try {
- Properties props = getProperties(member.getPayload());
- String groupchannel = (String)
props.get(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY);
- if (groupchannel == null ||
!groupchannel.equals(getClusterId())) {
- getLogService().log(LogService.LOG_ERROR,
- "Member joined with invalid channelid! Will ignore it:
" + member.toString());
- addFaultyMember(member);
- return;
- }
- String channelmember = (String)
props.get(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY);
- if (channelmember == null ||
channelmember.equals(getMemberId())
- || getClusterMember(channelmember) != null) {
- getLogService().log(LogService.LOG_ERROR,
- "Member joined with invalid memberid! Will ignore it:
" + member.toString());
- addFaultyMember(member);
- return;
- }
- addMemberIdMember(channelmember, member);
- getLogService().log(LogService.LOG_DEBUG, "Member added: " +
member.toString());
- }
- catch (Exception e) {
- getLogService().log(LogService.LOG_ERROR, "Exception while
adding member: " + member.toString(), e);
- }
+ TribesClusterMemberServiceImpl.this.memberAdded(member);
}
public void memberDisappeared(Member member) {
- try {
- if (removeFaultyMember(member)) {
- getLogService().log(LogService.LOG_DEBUG, "Faulty member
disappeared: " + member.toString());
- return;
- }
- String memberId =
getProperties(member.getPayload()).getProperty(SERVICE_CLUSTERMEMBER_PROPERTY);
- removeMemberIdMember(memberId);
- }
- catch (Exception e) {
- getLogService().log(LogService.LOG_ERROR, "Exception while
removing member: " + member.toString(), e);
- }
+ TribesClusterMemberServiceImpl.this.memberDisappeared(member);
}
}
class TribesChannelListener implements ChannelListener {
public boolean accept(Serializable message, Member member) {
- m_faultyMembersLock.readLock().lock();
- try {
- if (m_faultyMembers.contains(member)) {
- getLogService().log(LogService.LOG_WARNING,
- "Dropping message recieved from faulty member: " +
member.toString());
- return false;
- }
- }
- finally {
- m_faultyMembersLock.readLock().unlock();
- }
return true;
}
public void messageReceived(Serializable message, Member member) {
- dispatchMessage(message);
+ String memberId = getMemberId(member);
+ ClusterMember clusterMember = getClusterMember(memberId);
+ if (clusterMember == null) {
+ getLogService().log(LogService.LOG_ERROR,
+ "Dropping message for active member: " +
message.toString());
+ return;
+ }
+ getLogService().log(LogService.LOG_WARNING,
+ "messageRecieved: " + message.toString());
+ dispatchMessage(message, clusterMember);
}
}
}
\ No newline at end of file
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/ServiceEndPoint.java
Tue Jan 11 10:25:35 2011
@@ -121,21 +121,28 @@
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("ServiceEndPoint[");
- sb.append("clusterid=" + getClusterId());
- sb.append("; memberid=" + getMemberId());
- sb.append("; objectClass={");
- for (String part : getObjectClass()) {
- sb.append(" " + part);
+ StringBuilder sb = new
StringBuilder(ServiceEndPoint.class.getSimpleName() + "{");
+ sb.append("\n\tclusterid=" + getClusterId());
+ sb.append("\n\tmemberid=" + getMemberId());
+ sb.append("\n\tobjectClass=[");
+ for (int i = 0; i < getObjectClass().length; i++) {
+ if (i > 0)
+ sb.append(",");
+ sb.append(getObjectClass()[i]);
}
- sb.append(" }; properties={");
+ sb.append("]");
+ sb.append("\n\tproperties=[");
Enumeration<String> enumeration = getProperties().keys();
+ boolean first = true;
while (enumeration.hasMoreElements()) {
String key = (String) enumeration.nextElement();
- sb.append(" " + key + ":" + getProperties().get(key).toString());
+ if (!first)
+ sb.append(",");
+ sb.append(key + ":" + getProperties().get(key).toString());
+ first = false;
}
- sb.append("}]");
+ sb.append("\n\t]");
+ sb.append("\n}");
return sb.toString();
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/DiscoveryUtilities.java
Tue Jan 11 10:25:35 2011
@@ -1,3 +1,19 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
package org.amdatu.core.fabric.remote.internal;
import org.amdatu.core.fabric.remote.DiscoveryService;
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDepublishMessage.java
Tue Jan 11 10:25:35 2011
@@ -16,15 +16,17 @@
*/
package org.amdatu.core.fabric.remote.internal;
-import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
import org.amdatu.core.fabric.cluster.RoutableMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
-public class EndpointDepublishMessage extends RoutableMessage implements
LocalTopicMessage, Serializable {
+public class EndpointDepublishMessage extends RoutableMessage implements
LocalTopicMessage {
private static final long serialVersionUID = 1L;
+ private static final transient Pattern p = Pattern.compile("\n");
private final ServiceEndPoint m_serviceEndPoint;
@@ -43,4 +45,15 @@
return
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
m_serviceEndPoint.getServiceGroup());
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder(EndpointDepublishMessage.class.getSimpleName() + "{");
+ Matcher m1 = p.matcher(super.toString());
+ sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+ Matcher m2 = p.matcher(m_serviceEndPoint.toString());
+ sb.append("\n\tserviceEndPoint=" + m2.replaceAll("\n\t"));
+ sb.append("\n}");
+ return sb.toString();
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointDiscoveryMessage.java
Tue Jan 11 10:25:35 2011
@@ -16,18 +16,22 @@
*/
package org.amdatu.core.fabric.remote.internal;
-import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
+import org.amdatu.core.fabric.cluster.RoutableMessage;
-public class EndpointDiscoveryMessage implements LocalTopicMessage,
Serializable {
+public class EndpointDiscoveryMessage extends RoutableMessage implements
LocalTopicMessage {
private static final long serialVersionUID = 1L;
+ private static final transient Pattern p = Pattern.compile("\n");
private final String m_clusterId;
private final String m_serviceGroup;
public EndpointDiscoveryMessage(String clusterId, String serviceGroup) {
+ super(clusterId, null, serviceGroup);
m_clusterId = clusterId;
m_serviceGroup = serviceGroup;
}
@@ -36,4 +40,16 @@
return DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterId,
m_serviceGroup);
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder(EndpointDiscoveryMessage.class.getSimpleName() + "{");
+ Matcher m1 = p.matcher(super.toString());
+ sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+ sb.append("\n\tclusterId=" + m_clusterId);
+ sb.append("\n\tmemberId=" + getOriginMemberId());
+ sb.append("\n\tserviceGroup=" + m_serviceGroup);
+ sb.append("\n}");
+ return sb.toString();
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointInvokeMessage.java
Tue Jan 11 10:25:35 2011
@@ -17,6 +17,8 @@
package org.amdatu.core.fabric.remote.internal;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
import org.amdatu.core.fabric.cluster.RoutableMessage;
@@ -25,6 +27,7 @@
public class EndpointInvokeMessage extends RoutableMessage implements
LocalTopicMessage {
private static final long serialVersionUID = 1L;
+ private static final transient Pattern p = Pattern.compile("\n");
private final ServiceEndPoint m_serviceEndPoint;
private final Map<String, Object> m_payload;
@@ -47,4 +50,16 @@
return DistributionUtilities.getLocalInvokeTopic(getTargetClusterId(),
getTargetServiceGroup());
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("\nEndpointInvokeMessage{");
+ Matcher m1 = p.matcher(super.toString());
+ sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+ Matcher m2 = p.matcher(m_serviceEndPoint.toString());
+ sb.append("\n\tserviceEndPoint=" + m2.replaceAll("\n\t"));
+ sb.append("\n\tpayload=" + m_payload.toString());
+ sb.append("\n}");
+ return sb.toString();
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointPublishMessage.java
Tue Jan 11 10:25:35 2011
@@ -16,6 +16,9 @@
*/
package org.amdatu.core.fabric.remote.internal;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
import org.amdatu.core.fabric.cluster.RoutableMessage;
import org.amdatu.core.fabric.remote.ServiceEndPoint;
@@ -23,6 +26,7 @@
public class EndpointPublishMessage extends RoutableMessage implements
LocalTopicMessage {
private static final long serialVersionUID = 1L;
+ private static final transient Pattern p = Pattern.compile("\n");
private ServiceEndPoint m_serviceEndPoint;
@@ -41,4 +45,15 @@
return
DiscoveryUtilities.getLocalDiscoveryTopic(m_serviceEndPoint.getClusterId(),
m_serviceEndPoint.getServiceGroup());
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder(EndpointPublishMessage.class.getSimpleName() + "{");
+ Matcher m1 = p.matcher(super.toString());
+ sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+ Matcher m2 = p.matcher(m_serviceEndPoint.toString());
+ sb.append("\n\tserviceEndPoint=" + m2.replaceAll("\n\t"));
+ sb.append("\n}");
+ return sb.toString();
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/EndpointResponseMessage.java
Tue Jan 11 10:25:35 2011
@@ -17,6 +17,8 @@
package org.amdatu.core.fabric.remote.internal;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.amdatu.core.fabric.cluster.LocalTopicMessage;
import org.amdatu.core.fabric.cluster.RoutableMessage;
@@ -24,6 +26,7 @@
public class EndpointResponseMessage extends RoutableMessage implements
LocalTopicMessage {
private static final long serialVersionUID = 1L;
+ private static final transient Pattern p = Pattern.compile("\n");
private Map<String, Object> m_payload;
@@ -42,4 +45,14 @@
return
DistributionUtilities.getLocalResponseTopic(getTargetClusterId(),
getTargetServiceGroup());
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new
StringBuilder(EndpointResponseMessage.class.getSimpleName() + "{");
+ Matcher m1 = p.matcher(super.toString());
+ sb.append("\n\tsuper=" + m1.replaceAll("\n\t"));
+ sb.append("\n\tpayload=" + m_payload.toString());
+ sb.append("\n}");
+ return sb.toString();
+ }
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/internal/LocalServiceInvocationHandler.java
Tue Jan 11 10:25:35 2011
@@ -68,6 +68,8 @@
private final String m_clusterGroupId;
private final String m_serviceGroupId;
+ private final String m_clusterChannelInvokeTopic;
+
/********************************************************
* Constructors
********************************************************/
@@ -83,6 +85,7 @@
m_serviceInterfaceMethods.add(serviceMethod);
}
}
+ m_clusterChannelInvokeTopic =
ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId);
}
/********************************************************
@@ -126,7 +129,7 @@
}
public synchronized void stop() {
- m_logService.log(LogService.LOG_WARNING, "Starting
LocalServiceInvocationHandler");
+ m_logService.log(LogService.LOG_WARNING, "Stopping
LocalServiceInvocationHandler");
m_dependencyManager.remove(m_serviceInvocationEventHandlerComponent);
}
@@ -145,9 +148,7 @@
Dictionary<String, Object> eventPayload = new Hashtable<String,
Object>();
eventPayload.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
message);
- Event event =
- new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
- eventPayload);
+ Event event = new Event(m_clusterChannelInvokeTopic, eventPayload);
m_eventAdmin.postEvent(event);
Object response = retrieveInvocationResponse(invocationIdentifier);
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/remote/service/DiscoveryServiceImpl.java
Tue Jan 11 10:25:35 2011
@@ -20,11 +20,15 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.amdatu.core.fabric.cluster.ClusterMemberService;
+import org.amdatu.core.fabric.cluster.MemberAddedEvent;
+import org.amdatu.core.fabric.cluster.MemberRemovedEvent;
import org.amdatu.core.fabric.cluster.internal.ClusterMemberUtilities;
import org.amdatu.core.fabric.remote.DiscoveryService;
import org.amdatu.core.fabric.remote.RemotableServiceEndpoint;
@@ -126,7 +130,8 @@
Dictionary<String, Object> eventHandlerProps = new Hashtable<String,
Object>();
eventHandlerProps.put(EventConstants.EVENT_TOPIC,
- new String[] {
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId)
});
+ new String[] {
DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId, m_serviceGroupId),
+
ClusterMemberUtilities.getClusterChannelReceiveTopic(m_clusterGroupId) });
m_discoveryEventHandlerComponent =
m_dependencyManager.createComponent();
m_discoveryEventHandlerComponent.setInterface(EventHandler.class.getName(),
eventHandlerProps);
m_discoveryEventHandlerComponent.setImplementation(new
DiscoveryEventHandler());
@@ -139,7 +144,7 @@
public synchronized void start() {
m_logService.log(LogService.LOG_INFO, "Starting " + toString());
m_dependencyManager.add(m_discoveryEventHandlerComponent);
- m_evenAdmin.postEvent(createDiscoveryEvent());
+ m_evenAdmin.postEvent(createDiscoveryEvent(null));
}
public synchronized void stop() {
@@ -159,7 +164,7 @@
try {
if (!m_remotableEndPoints.contains(serviceEndPoint)) {
m_remotableEndPoints.add(serviceEndPoint);
-
m_evenAdmin.postEvent(createEndpointPublishEvent(serviceEndPoint));
+
m_evenAdmin.postEvent(createEndpointPublishEvent(serviceEndPoint, null));
}
else {
throw new IllegalStateException("Unexpected state... needs
analysis");
@@ -194,7 +199,15 @@
* Private methods
********************************************************/
- private Event createDiscoveryEvent() {
+ private Event createDiscoveryEvent(MemberAddedEvent memberAddedEvent) {
+ EndpointDiscoveryMessage endpointDiscoveryMessage =
+ new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId);
+
+ if (memberAddedEvent != null) {
+ endpointDiscoveryMessage.setTargetClusterId(m_clusterGroupId);
+
endpointDiscoveryMessage.setTargetMemberId(memberAddedEvent.getMemberId());
+ }
+
Dictionary<String, Object> eventProps = new Hashtable<String,
Object>();
eventProps.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
new EndpointDiscoveryMessage(m_clusterGroupId, m_serviceGroupId));
@@ -204,9 +217,17 @@
return discoveryEvent;
}
- private Event createEndpointPublishEvent(ServiceEndPoint serviceEndPoint) {
+ private Event createEndpointPublishEvent(ServiceEndPoint serviceEndPoint,
+ EndpointDiscoveryMessage endpointDiscoveryMessage) {
Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY, new
EndpointPublishMessage(serviceEndPoint));
+ EndpointPublishMessage endpointPublishMessage = new
EndpointPublishMessage(serviceEndPoint);
+
+ if (endpointDiscoveryMessage != null) {
+
endpointPublishMessage.setTargetClusterId(endpointDiscoveryMessage.getOriginClusterId());
+
endpointPublishMessage.setTargetMemberId(endpointDiscoveryMessage.getOriginMemberId());
+ }
+
+ props.put(ClusterMemberService.EVENT_MESSAGE_PROPERTY,
endpointPublishMessage);
Event event =
new
Event(ClusterMemberUtilities.getClusterChannelSendTopic(m_clusterGroupId),
props);
@@ -239,7 +260,7 @@
m_remotableEndPointsLock.writeLock().lock();
try {
for (ServiceEndPoint serviceEndPoint : m_remotableEndPoints) {
- Event event = createEndpointPublishEvent(serviceEndPoint);
+ Event event = createEndpointPublishEvent(serviceEndPoint,
endpointDiscoveryMessage);
m_evenAdmin.postEvent(event);
}
}
@@ -279,6 +300,8 @@
m_clusterGroupId);
distributionProps.put(DiscoveryService.REMOTE_SERVICEGROUPID_PROP,
m_serviceGroupId);
+
distributionProps.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
+ serviceEndPoint.getMemberId());
serviceComponent =
m_dependencyManager.createComponent()
.setInterface(RemoteServiceEndPoint.class.getName(),
distributionProps)
@@ -300,6 +323,43 @@
return;
}
+ private void recieveMemberAddedEvent(final MemberAddedEvent
memberAddedEvent) {
+ m_logService.log(LogService.LOG_ERROR, "Recieved \n" +
memberAddedEvent.toString());
+ m_evenAdmin.postEvent(createDiscoveryEvent(memberAddedEvent));
+ }
+
+ private void recieveMemberRemovedEvent(final MemberRemovedEvent
memberRemovedEvent) {
+ m_logService.log(LogService.LOG_ERROR, "Recieved \n" +
memberRemovedEvent.toString());
+ List<ServiceEndPoint> removeServiceEndpointList = null;
+ List<Component> removeServiceComponentList = null;
+ m_remoteEndPointComponentsLock.writeLock().lock();
+ try {
+ for (ServiceEndPoint serviceEndPoint :
m_remoteEndPointComponents.keySet()) {
+ if
(serviceEndPoint.getMemberId().equals(memberRemovedEvent.getMemberId())) {
+ if (removeServiceEndpointList == null)
+ removeServiceEndpointList = new
LinkedList<ServiceEndPoint>();
+ removeServiceEndpointList.add(serviceEndPoint);
+ }
+ }
+ if (removeServiceEndpointList != null) {
+ removeServiceComponentList = new LinkedList<Component>();
+ for (ServiceEndPoint serviceEndPoint :
removeServiceEndpointList) {
+ m_logService.log(LogService.LOG_ERROR, "Removing\n" +
memberRemovedEvent.toString());
+
removeServiceComponentList.add(m_remoteEndPointComponents.remove(serviceEndPoint));
+ }
+ }
+ }
+ finally {
+ m_remoteEndPointComponentsLock.writeLock().unlock();
+ }
+ if (removeServiceComponentList != null) {
+ for (Component serviceComponent : removeServiceComponentList) {
+ m_dependencyManager.remove(serviceComponent);
+ }
+ }
+ return;
+ }
+
/********************************************************
* Helper classes
********************************************************/
@@ -320,6 +380,14 @@
recieveEndpointDiscoveryMessage((EndpointDiscoveryMessage)
message);
return;
}
+ if (message instanceof MemberAddedEvent) {
+ recieveMemberAddedEvent((MemberAddedEvent) message);
+ return;
+ }
+ if (message instanceof MemberRemovedEvent) {
+ recieveMemberRemovedEvent((MemberRemovedEvent) message);
+ return;
+ }
throw new IllegalStateException("Unknown message type " +
message.getClass().getName() + "on channel "
+ DiscoveryUtilities.getLocalDiscoveryTopic(m_clusterGroupId,
m_serviceGroupId));
}
Modified:
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
==============================================================================
---
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
(original)
+++
sandbox/bdekruijff/fabric/src/main/java/org/amdatu/core/fabric/service/FabricManagerServiceImpl.java
Tue Jan 11 10:25:35 2011
@@ -99,7 +99,7 @@
properties
.put(TribesClusterMemberServiceImpl.CLUSTER_TRIBES_ARGS_PROP, args.split(" "));
- createClusterMember(clusterGroupId, clusterMemberId,
properties);
+ createClusterChannel(clusterGroupId, properties);
}
}
@@ -125,26 +125,19 @@
* FabricManagerService methods
********************************************************/
- public boolean createClusterMember(String clusterGroupId, String
clusterMemberId,
- Dictionary<String, Object> properties) {
- if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
- m_logService.log(LogService.LOG_ERROR, "Failed to create cluster.
Invalid clustername: " + clusterGroupId);
- return false;
- }
- if (!ClusterMemberUtilities.isValidMemberName(clusterMemberId)) {
- m_logService.log(LogService.LOG_ERROR, "Failed to create cluster.
Invalid membername: " + clusterMemberId);
+ public boolean createClusterChannel(String clusterChannelName,
Dictionary<String, Object> clusterChannelProperties) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to create
clusterchannel. Invalid clusterChannelName: "
+ + clusterChannelName);
return false;
}
- Dictionary<String, Object> svcProperties = new Hashtable<String,
Object>();
-// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERCHANNEL_PROPERTY,
clusterMemberId);
-// svcProperties.put(ClusterMemberService.SERVICE_CLUSTERMEMBER_PROPERTY,
clusterMemberId);
-
Component clusterMemberComponent =
m_dependencyManager
.createComponent()
- .setInterface(ClusterMemberService.class.getName(), properties)
- .setFactory(new ClusterMemberServiceFactory(clusterGroupId,
clusterMemberId, properties), "getInstance")
+ .setInterface(ClusterMemberService.class.getName(),
clusterChannelProperties)
+ .setFactory(new
ClusterMemberServiceFactory(clusterChannelName, clusterChannelProperties),
+ "getInstance")
.add(
m_dependencyManager.createServiceDependency()
.setService(FabricManagerService.class)
@@ -152,11 +145,12 @@
m_clusterMemberComponentsLock.writeLock().lock();
try {
- if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId,
clusterMemberId, ""))) {
+ if (m_clusterMemberComponents.containsKey(clusterChannelName)) {
m_dependencyManager.remove(m_clusterMemberComponents
- .remove(getKey(clusterGroupId, clusterMemberId, "")));
+ .remove(clusterChannelName));
+
}
- m_clusterMemberComponents.put(getKey(clusterGroupId,
clusterMemberId, ""), clusterMemberComponent);
+ m_clusterMemberComponents.put(clusterChannelName,
clusterMemberComponent);
m_dependencyManager.add(clusterMemberComponent);
return true;
}
@@ -165,16 +159,17 @@
}
}
- public boolean removeClusterMember(String clusterGroupId, String
clusterMemberId) {
- if (clusterMemberId == null || "".equals(clusterMemberId)) {
+ public boolean removeClusterChannel(String clusterGroupId) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterGroupId)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to remove
clusterchannel. Invalid clusterChannelName: "
+ + clusterGroupId);
return false;
}
m_clusterMemberComponentsLock.writeLock().lock();
try {
- if (m_clusterMemberComponents.containsKey(getKey(clusterGroupId,
clusterMemberId, ""))) {
- m_dependencyManager.remove(m_clusterMemberComponents
- .remove(getKey(clusterGroupId, clusterMemberId, "")));
+ if (m_clusterMemberComponents.containsKey(clusterGroupId)) {
+
m_dependencyManager.remove(m_clusterMemberComponents.remove(clusterGroupId));
return true;
}
return false;
@@ -184,15 +179,25 @@
}
}
- public boolean createDiscovery(String clusterGroupId, String
serviceGroupId) {
- // FIXME improve checking
+ public boolean createDiscovery(String clusterChannelName, String
serviceGroupName) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ m_logService
+ .log(LogService.LOG_ERROR, "Failed to create discovery.
Invalid clusterChannelName: "
+ + clusterChannelName);
+ return false;
+ }
+ if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to create
discovery. Invalid serviceGroupName: "
+ + serviceGroupName);
+ return false;
+ }
Component discoveryComponent =
m_dependencyManager
.createComponent()
.setInterface(DiscoveryService.class.getName(),
null)
- .setFactory(new DiscoveryServiceFactory(clusterGroupId,
serviceGroupId), "getInstance")
+ .setFactory(new DiscoveryServiceFactory(clusterChannelName,
serviceGroupName), "getInstance")
.add(
m_dependencyManager.createServiceDependency()
.setService(FabricManagerService.class)
@@ -200,11 +205,11 @@
m_discoveryComponentsLock.writeLock().lock();
try {
- if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "",
serviceGroupId))) {
-
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId,
"",
- serviceGroupId)));
+ if (m_discoveryComponents.containsKey(getKey(clusterChannelName,
serviceGroupName))) {
+
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterChannelName,
+ serviceGroupName)));
}
- m_discoveryComponents.put(getKey(clusterGroupId, "",
serviceGroupId), discoveryComponent);
+ m_discoveryComponents.put(getKey(clusterChannelName,
serviceGroupName), discoveryComponent);
m_dependencyManager.add(discoveryComponent);
return true;
}
@@ -213,14 +218,23 @@
}
}
- public boolean removeDiscovery(String clusterGroupId, String
serviceGroupId) {
- // FIXME improve checks
-
+ public boolean removeDiscovery(String clusterChannelName, String
serviceGroupName) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ m_logService
+ .log(LogService.LOG_ERROR, "Failed to remove discovery.
Invalid clusterChannelName: "
+ + clusterChannelName);
+ return false;
+ }
+ if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to remove
discovery. Invalid serviceGroupName: "
+ + serviceGroupName);
+ return false;
+ }
m_discoveryComponentsLock.writeLock().lock();
try {
- if (m_discoveryComponents.containsKey(getKey(clusterGroupId, "",
serviceGroupId))) {
-
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterGroupId,
"",
- serviceGroupId)));
+ if (m_discoveryComponents.containsKey(getKey(clusterChannelName,
serviceGroupName))) {
+
m_dependencyManager.remove(m_discoveryComponents.remove(getKey(clusterChannelName,
+ serviceGroupName)));
return true;
}
return false;
@@ -230,14 +244,22 @@
}
}
- public boolean createDistribution(String clusterGroupId, String
serviceGroupId) {
- // FIXME improve checks
-
+ public boolean createDistribution(String clusterChannelName, String
serviceGroupName) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ m_logService
+ .log(LogService.LOG_ERROR, "Failed to create distribution.
Invalid clustername: " + clusterChannelName);
+ return false;
+ }
+ if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to remove
distribution. Invalid servicegroupname: "
+ + serviceGroupName);
+ return false;
+ }
Component distributionComponent =
m_dependencyManager
.createComponent()
.setInterface(DistributionService.class.getName(), null)
- .setFactory(new DistributionServiceFactory(clusterGroupId,
serviceGroupId), "getInstance")
+ .setFactory(new DistributionServiceFactory(clusterChannelName,
serviceGroupName), "getInstance")
.add(
m_dependencyManager.createServiceDependency()
.setService(FabricManagerService.class)
@@ -245,11 +267,11 @@
m_distributionComponentsLock.writeLock().lock();
try {
- if (m_distributionComponents.containsKey(getKey(clusterGroupId,
"", serviceGroupId))) {
-
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
"",
- serviceGroupId)));
+ if
(m_distributionComponents.containsKey(getKey(clusterChannelName,
serviceGroupName))) {
+
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterChannelName,
+ serviceGroupName)));
}
- m_distributionComponents.put(getKey(clusterGroupId, "",
serviceGroupId),
+ m_distributionComponents.put(getKey(clusterChannelName,
serviceGroupName),
distributionComponent);
m_dependencyManager.add(distributionComponent);
return true;
@@ -259,14 +281,23 @@
}
}
- public boolean removeDistribution(String clusterGroupId, String
serviceGroupId) {
- // TODO improve checks
-
+ public boolean removeDistribution(String clusterChannelName, String
serviceGroupName) {
+ if (!ClusterMemberUtilities.isValidChannelName(clusterChannelName)) {
+ m_logService
+ .log(LogService.LOG_ERROR, "Failed to create distribution.
Invalid clusterChannelname: "
+ + clusterChannelName);
+ return false;
+ }
+ if (!ClusterMemberUtilities.isValidChannelName(serviceGroupName)) {
+ m_logService.log(LogService.LOG_ERROR, "Failed to remove
distribution. Invalid serviceGroupName: "
+ + serviceGroupName);
+ return false;
+ }
m_distributionComponentsLock.writeLock().lock();
try {
- if (m_distributionComponents.containsKey(getKey(clusterGroupId,
"", serviceGroupId))) {
-
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterGroupId,
"",
- serviceGroupId)));
+ if
(m_distributionComponents.containsKey(getKey(clusterChannelName,
serviceGroupName))) {
+
m_dependencyManager.remove(m_distributionComponents.remove(getKey(clusterChannelName,
+ serviceGroupName)));
return true;
}
return false;
@@ -280,8 +311,8 @@
* Private methods
********************************************************/
- private String getKey(String clusterGroupId, String clusterMemberId,
String serviceGroupId) {
- return clusterGroupId + "#" + clusterMemberId + "#" + serviceGroupId;
+ private String getKey(String clusterChannelName, String serviceGroupName) {
+ return clusterChannelName + "#" + serviceGroupName;
}
/********************************************************
@@ -290,49 +321,47 @@
static class ClusterMemberServiceFactory {
- private final String m_clusterGroupId;
- private final String m_clusterMemberId;
- private final Dictionary<String, Object> m_properties;
-
- public ClusterMemberServiceFactory(String clusterGroupId, String
clusterMemberId,
- Dictionary<String, Object> properties) {
- m_clusterGroupId = clusterGroupId;
- m_clusterMemberId = clusterMemberId;
- m_properties = properties;
+ private final String m_clusterChannelName;
+ private final Dictionary<String, Object> m_clusterChannelProperties;
+
+ public ClusterMemberServiceFactory(String clusterChannelName,
+ Dictionary<String, Object> clusterChannelProperties) {
+ m_clusterChannelName = clusterChannelName;
+ m_clusterChannelProperties = clusterChannelProperties;
}
public ClusterMemberService getInstance() {
- return new TribesClusterMemberServiceImpl(m_clusterGroupId,
m_clusterMemberId, m_properties);
+ return new TribesClusterMemberServiceImpl(m_clusterChannelName,
m_clusterChannelProperties);
}
}
static class DiscoveryServiceFactory {
- private final String m_clusterGroupId;
- private final String m_serviceGroupId;
+ private final String m_clusterChannelName;
+ private final String m_serviceGroupName;
- public DiscoveryServiceFactory(String clusterGroupId, String
serviceGroupId) {
- m_clusterGroupId = clusterGroupId;
- m_serviceGroupId = serviceGroupId;
+ public DiscoveryServiceFactory(String clusterChannelName, String
serviceGroupName) {
+ m_clusterChannelName = clusterChannelName;
+ m_serviceGroupName = serviceGroupName;
}
public DiscoveryService getInstance() {
- return new DiscoveryServiceImpl(m_clusterGroupId,
m_serviceGroupId);
+ return new DiscoveryServiceImpl(m_clusterChannelName,
m_serviceGroupName);
}
}
static class DistributionServiceFactory {
- private final String m_clusterGroupId;
- private final String m_serviceGroupId;
+ private final String m_clusterChannelName;
+ private final String m_serviceGroupName;
- public DistributionServiceFactory(String clusterGroupId, String
serviceGroupId) {
- m_clusterGroupId = clusterGroupId;
- m_serviceGroupId = serviceGroupId;
+ public DistributionServiceFactory(String clusterChannelName, String
serviceGroupName) {
+ m_clusterChannelName = clusterChannelName;
+ m_serviceGroupName = serviceGroupName;
}
public DistributionService getInstance() {
- return new DistributionServiceImpl(m_clusterGroupId,
m_serviceGroupId);
+ return new DistributionServiceImpl(m_clusterChannelName,
m_serviceGroupName);
}
}
}