This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 10.1.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/10.1.x by this push: new f64ed49efe Code clean-up - no functional change f64ed49efe is described below commit f64ed49efea733637987e01716569ca172fa9e49 Author: Mark Thomas <ma...@apache.org> AuthorDate: Fri May 10 13:45:49 2024 +0100 Code clean-up - no functional change --- .../catalina/tribes/group/AbsoluteOrder.java | 62 ++--- .../catalina/tribes/group/ChannelCoordinator.java | 147 ++++++------ .../tribes/group/ChannelInterceptorBase.java | 30 +-- .../catalina/tribes/group/ExtendedRpcCallback.java | 18 +- .../apache/catalina/tribes/group/GroupChannel.java | 257 ++++++++++----------- .../catalina/tribes/group/GroupChannelMBean.java | 10 +- .../catalina/tribes/group/InterceptorPayload.java | 2 +- .../org/apache/catalina/tribes/group/Response.java | 1 + .../apache/catalina/tribes/group/RpcCallback.java | 15 +- .../apache/catalina/tribes/group/RpcChannel.java | 111 ++++----- .../apache/catalina/tribes/group/RpcMessage.java | 8 +- 11 files changed, 328 insertions(+), 333 deletions(-) diff --git a/java/org/apache/catalina/tribes/group/AbsoluteOrder.java b/java/org/apache/catalina/tribes/group/AbsoluteOrder.java index 1e06dc1f09..df889928e7 100644 --- a/java/org/apache/catalina/tribes/group/AbsoluteOrder.java +++ b/java/org/apache/catalina/tribes/group/AbsoluteOrder.java @@ -24,19 +24,22 @@ import java.util.List; import org.apache.catalina.tribes.Member; /** - * <p>Title: Membership - Absolute Order</p> - * - * <p>Description: A simple, yet agreeable and efficient way of ordering members</p> * <p> - * Ordering members can serve as a basis for electing a leader or coordinating efforts.<br> - * This is stinky simple, it works on the basis of the <code>Member</code> interface - * and orders members in the following format: + * Title: Membership - Absolute Order + * </p> + * <p> + * Description: A simple, yet agreeable and efficient way of ordering members + * </p> + * <p> + * Ordering members can serve as a basis for electing a leader or coordinating efforts.<br> + * This is stinky simple, it works on the basis of the <code>Member</code> interface and orders members in the following + * format: * </p> * <ol> - * <li>IP comparison - byte by byte, lower byte higher rank</li> - * <li>IPv4 addresses rank higher than IPv6, ie the lesser number of bytes, the higher rank</li> - * <li>Port comparison - lower port, higher rank</li> - * <li>UniqueId comparison- byte by byte, lower byte higher rank</li> + * <li>IP comparison - byte by byte, lower byte higher rank</li> + * <li>IPv4 addresses rank higher than IPv6, ie the lesser number of bytes, the higher rank</li> + * <li>Port comparison - lower port, higher rank</li> + * <li>UniqueId comparison- byte by byte, lower byte higher rank</li> * </ol> * * @see org.apache.catalina.tribes.Member @@ -50,55 +53,54 @@ public class AbsoluteOrder { public static void absoluteOrder(Member[] members) { - if ( members == null || members.length <= 1 ) { + if (members == null || members.length <= 1) { return; } - Arrays.sort(members,comp); + Arrays.sort(members, comp); } public static void absoluteOrder(List<Member> members) { - if ( members == null || members.size() <= 1 ) { + if (members == null || members.size() <= 1) { return; } members.sort(comp); } - public static class AbsoluteComparator implements Comparator<Member>, - Serializable { + public static class AbsoluteComparator implements Comparator<Member>, Serializable { private static final long serialVersionUID = 1L; @Override public int compare(Member m1, Member m2) { - int result = compareIps(m1,m2); - if ( result == 0 ) { - result = comparePorts(m1,m2); + int result = compareIps(m1, m2); + if (result == 0) { + result = comparePorts(m1, m2); } - if ( result == 0 ) { - result = compareIds(m1,m2); + if (result == 0) { + result = compareIds(m1, m2); } return result; } public int compareIps(Member m1, Member m2) { - return compareBytes(m1.getHost(),m2.getHost()); + return compareBytes(m1.getHost(), m2.getHost()); } public int comparePorts(Member m1, Member m2) { - return compareInts(m1.getPort(),m2.getPort()); + return compareInts(m1.getPort(), m2.getPort()); } public int compareIds(Member m1, Member m2) { - return compareBytes(m1.getUniqueId(),m2.getUniqueId()); + return compareBytes(m1.getUniqueId(), m2.getUniqueId()); } protected int compareBytes(byte[] d1, byte[] d2) { int result = 0; - if ( d1.length == d2.length ) { - for (int i=0; (result==0) && (i<d1.length); i++) { - result = compareBytes(d1[i],d2[i]); + if (d1.length == d2.length) { + for (int i = 0; (result == 0) && (i < d1.length); i++) { + result = compareBytes(d1[i], d2[i]); } - } else if ( d1.length < d2.length) { + } else if (d1.length < d2.length) { result = -1; } else { result = 1; @@ -107,14 +109,14 @@ public class AbsoluteOrder { } protected int compareBytes(byte b1, byte b2) { - return compareInts(b1,b2); + return compareInts(b1, b2); } protected int compareInts(int b1, int b2) { int result = 0; - if ( b1 == b2 ) { + if (b1 == b2) { - } else if ( b1 < b2) { + } else if (b1 < b2) { result = -1; } else { result = 1; diff --git a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java index 3f93572a77..bd744c1a27 100644 --- a/java/org/apache/catalina/tribes/group/ChannelCoordinator.java +++ b/java/org/apache/catalina/tribes/group/ChannelCoordinator.java @@ -35,9 +35,8 @@ import org.apache.catalina.tribes.util.Logs; import org.apache.catalina.tribes.util.StringManager; /** - * The channel coordinator object coordinates the membership service, - * the sender and the receiver. - * This is the last interceptor in the chain. + * The channel coordinator object coordinates the membership service, the sender and the receiver. This is the last + * interceptor in the chain. */ public class ChannelCoordinator extends ChannelInterceptorBase implements MessageListener { protected static final StringManager sm = StringManager.getManager(ChannelCoordinator.class); @@ -48,16 +47,12 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag private int startLevel = 0; public ChannelCoordinator() { - this(new NioReceiver(), new ReplicationTransmitter(), - new McastService()); + this(new NioReceiver(), new ReplicationTransmitter(), new McastService()); } - public ChannelCoordinator(ChannelReceiver receiver, - ChannelSender sender, - MembershipService service) { + public ChannelCoordinator(ChannelReceiver receiver, ChannelSender sender, MembershipService service) { - this.optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE | - Channel.SEND_OPTIONS_USE_ACK | + this.optionFlag = Channel.SEND_OPTIONS_BYTE_MESSAGE | Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; this.setClusterReceiver(receiver); @@ -67,25 +62,25 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag /** * Send a message to one or more members in the cluster + * * @param destination Member[] - the destinations, null or zero length means all - * @param msg ClusterMessage - the message to send - * @param payload TBA + * @param msg ClusterMessage - the message to send + * @param payload TBA */ @Override public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { - if ( destination == null ) { + if (destination == null) { destination = membershipService.getMembers(); } - if ((msg.getOptions()&Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { + if ((msg.getOptions() & Channel.SEND_OPTIONS_MULTICAST) == Channel.SEND_OPTIONS_MULTICAST) { membershipService.broadcast(msg); } else { - clusterSender.sendMessage(msg,destination); + clusterSender.sendMessage(msg, destination); } - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + - " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + - Arrays.toNameString(destination)); + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " + + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination)); } } @@ -102,75 +97,73 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag /** - * Starts up the channel. This can be called multiple times for individual services to start - * The svc parameter can be the logical or value of any constants + * Starts up the channel. This can be called multiple times for individual services to start The svc parameter can + * be the logical or value of any constants + * * @param svc int value of <BR> - * DEFAULT - will start all services <BR> - * MBR_RX_SEQ - starts the membership receiver <BR> - * MBR_TX_SEQ - starts the membership broadcaster <BR> - * SND_TX_SEQ - starts the replication transmitter<BR> - * SND_RX_SEQ - starts the replication receiver<BR> + * DEFAULT - will start all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * * @throws ChannelException if a startup error occurs or the service is already started. */ protected synchronized void internalStart(int svc) throws ChannelException { try { boolean valid = false; - //make sure we don't pass down any flags that are unrelated to the bottom layer + // make sure we don't pass down any flags that are unrelated to the bottom layer svc = svc & Channel.DEFAULT; - if (startLevel == Channel.DEFAULT) - { - return; //we have already started up all components + if (startLevel == Channel.DEFAULT) { + return; // we have already started up all components } - if (svc == 0 ) - { - return;//nothing to start + if (svc == 0) { + return;// nothing to start } if (svc == (svc & startLevel)) { - throw new ChannelException(sm.getString("channelCoordinator.alreadyStarted", - Integer.toString(svc))); + throw new ChannelException(sm.getString("channelCoordinator.alreadyStarted", Integer.toString(svc))); } - //must start the receiver first so that we can coordinate the port it - //listens to with the local membership settings - if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) { + // must start the receiver first so that we can coordinate the port it + // listens to with the local membership settings + if (Channel.SND_RX_SEQ == (svc & Channel.SND_RX_SEQ)) { clusterReceiver.setMessageListener(this); clusterReceiver.setChannel(getChannel()); clusterReceiver.start(); - //synchronize, big time FIXME + // synchronize, big time FIXME Member localMember = getChannel().getLocalMember(false); if (localMember instanceof StaticMember) { // static member - StaticMember staticMember = (StaticMember)localMember; + StaticMember staticMember = (StaticMember) localMember; staticMember.setHost(getClusterReceiver().getHost()); staticMember.setPort(getClusterReceiver().getPort()); staticMember.setSecurePort(getClusterReceiver().getSecurePort()); } else { // multicast member membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), - getClusterReceiver().getPort(), - getClusterReceiver().getSecurePort(), + getClusterReceiver().getPort(), getClusterReceiver().getSecurePort(), getClusterReceiver().getUdpPort()); } valid = true; } - if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) { + if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) { clusterSender.setChannel(getChannel()); clusterSender.start(); valid = true; } - if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) { + if (Channel.MBR_RX_SEQ == (svc & Channel.MBR_RX_SEQ)) { membershipService.setMembershipListener(this); membershipService.setChannel(getChannel()); if (membershipService instanceof McastService) { - ((McastService)membershipService).setMessageListener(this); + ((McastService) membershipService).setMessageListener(this); } membershipService.start(MembershipService.MBR_RX); valid = true; } - if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) { + if (Channel.MBR_TX_SEQ == (svc & Channel.MBR_TX_SEQ)) { membershipService.setChannel(getChannel()); membershipService.start(MembershipService.MBR_TX); valid = true; @@ -180,59 +173,59 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag throw new IllegalArgumentException(sm.getString("channelCoordinator.invalid.startLevel")); } startLevel = (startLevel | svc); - }catch ( ChannelException cx ) { + } catch (ChannelException cx) { throw cx; - }catch ( Exception x ) { + } catch (Exception x) { throw new ChannelException(x); } } /** - * Shuts down the channel. This can be called multiple times for individual services to shutdown - * The svc parameter can be the logical or value of any constants + * Shuts down the channel. This can be called multiple times for individual services to shutdown The svc parameter + * can be the logical or value of any constants + * * @param svc int value of <BR> - * DEFAULT - will shutdown all services <BR> - * MBR_RX_SEQ - starts the membership receiver <BR> - * MBR_TX_SEQ - starts the membership broadcaster <BR> - * SND_TX_SEQ - starts the replication transmitter<BR> - * SND_RX_SEQ - starts the replication receiver<BR> + * DEFAULT - will shutdown all services <BR> + * MBR_RX_SEQ - starts the membership receiver <BR> + * MBR_TX_SEQ - starts the membership broadcaster <BR> + * SND_TX_SEQ - starts the replication transmitter<BR> + * SND_RX_SEQ - starts the replication receiver<BR> + * * @throws ChannelException if a startup error occurs or the service is already started. */ protected synchronized void internalStop(int svc) throws ChannelException { try { - //make sure we don't pass down any flags that are unrelated to the bottom layer + // make sure we don't pass down any flags that are unrelated to the bottom layer svc = svc & Channel.DEFAULT; - if (startLevel == 0) - { - return; //we have already stopped up all components + if (startLevel == 0) { + return; // we have already stopped up all components } - if (svc == 0 ) - { - return;//nothing to stop + if (svc == 0) { + return;// nothing to stop } boolean valid = false; - if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) { + if (Channel.MBR_TX_SEQ == (svc & Channel.MBR_TX_SEQ)) { membershipService.stop(MembershipService.MBR_TX); valid = true; } - if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) { + if (Channel.MBR_RX_SEQ == (svc & Channel.MBR_RX_SEQ)) { membershipService.stop(MembershipService.MBR_RX); membershipService.setMembershipListener(null); valid = true; } - if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) { + if (Channel.SND_TX_SEQ == (svc & Channel.SND_TX_SEQ)) { clusterSender.stop(); valid = true; } - if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) { + if (Channel.SND_RX_SEQ == (svc & Channel.SND_RX_SEQ)) { clusterReceiver.stop(); clusterReceiver.setMessageListener(null); valid = true; } - if ( !valid) { + if (!valid) { throw new IllegalArgumentException(sm.getString("channelCoordinator.invalid.startLevel")); } @@ -244,24 +237,22 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag } @Override - public void memberAdded(Member member){ + public void memberAdded(Member member) { SenderState.getSenderState(member); super.memberAdded(member); } @Override - public void memberDisappeared(Member member){ + public void memberDisappeared(Member member) { SenderState.removeSenderState(member); super.memberDisappeared(member); } @Override public void messageReceived(ChannelMessage msg) { - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("ChannelCoordinator - Received msg:" + - new UniqueId(msg.getUniqueId()) + " at " + - new java.sql.Timestamp(System.currentTimeMillis()) + " from " + - msg.getAddress().getName()); + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("ChannelCoordinator - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " + + new java.sql.Timestamp(System.currentTimeMillis()) + " from " + msg.getAddress().getName()); } super.messageReceived(msg); } @@ -284,11 +275,11 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag } public void setClusterReceiver(ChannelReceiver clusterReceiver) { - if ( clusterReceiver != null ) { + if (clusterReceiver != null) { this.clusterReceiver = clusterReceiver; this.clusterReceiver.setMessageListener(this); } else { - if (this.clusterReceiver!=null ) { + if (this.clusterReceiver != null) { this.clusterReceiver.setMessageListener(null); } this.clusterReceiver = null; @@ -306,7 +297,7 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag @Override public void heartbeat() { - if ( clusterSender!=null ) { + if (clusterSender != null) { clusterSender.heartbeat(); } super.heartbeat(); @@ -323,7 +314,7 @@ public class ChannelCoordinator extends ChannelInterceptorBase implements Messag } @Override - public Member getMember(Member mbr){ + public Member getMember(Member mbr) { return this.getMembershipService().getMember(mbr); } diff --git a/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java b/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java index 7bb01d5202..fcb7db7925 100644 --- a/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java +++ b/java/org/apache/catalina/tribes/group/ChannelInterceptorBase.java @@ -33,7 +33,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { private ChannelInterceptor next; private ChannelInterceptor previous; private Channel channel; - //default value, always process + // default value, always process protected int optionFlag = 0; /** @@ -46,10 +46,10 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { } public boolean okToProcess(int messageFlags) { - if (this.optionFlag == 0 ) { + if (this.optionFlag == 0) { return true; } - return ((optionFlag&messageFlags) == optionFlag); + return ((optionFlag & messageFlags) == optionFlag); } @Override @@ -83,8 +83,8 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { } @Override - public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws - ChannelException { + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) + throws ChannelException { if (getNext() != null) { getNext().sendMessage(destination, msg, payload); } @@ -99,7 +99,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public void memberAdded(Member member) { - //notify upwards + // notify upwards if (getPrevious() != null) { getPrevious().memberAdded(member); } @@ -107,7 +107,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public void memberDisappeared(Member member) { - //notify upwards + // notify upwards if (getPrevious() != null) { getPrevious().memberDisappeared(member); } @@ -122,7 +122,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public boolean hasMembers() { - if ( getNext()!=null ) { + if (getNext() != null) { return getNext().hasMembers(); } else { return false; @@ -131,7 +131,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public Member[] getMembers() { - if ( getNext()!=null ) { + if (getNext() != null) { return getNext().getMembers(); } else { return null; @@ -140,7 +140,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public Member getMember(Member mbr) { - if ( getNext()!=null) { + if (getNext() != null) { return getNext().getMember(mbr); } else { return null; @@ -149,7 +149,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public Member getLocalMember(boolean incAlive) { - if ( getNext()!=null ) { + if (getNext() != null) { return getNext().getLocalMember(incAlive); } else { return null; @@ -158,14 +158,14 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public void start(int svc) throws ChannelException { - if ( getNext()!=null ) { + if (getNext() != null) { getNext().start(svc); } // register jmx JmxRegistry jmxRegistry = JmxRegistry.getRegistry(channel); if (jmxRegistry != null) { - this.oname = jmxRegistry.registerJmx( - ",component=Interceptor,interceptorName=" + getClass().getSimpleName(), this); + this.oname = jmxRegistry.registerJmx(",component=Interceptor,interceptorName=" + getClass().getSimpleName(), + this); } } @@ -183,7 +183,7 @@ public abstract class ChannelInterceptorBase implements ChannelInterceptor { @Override public void fireInterceptorEvent(InterceptorEvent event) { - //empty operation + // empty operation } @Override diff --git a/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java index c5e460be8e..4902071f02 100644 --- a/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java +++ b/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java @@ -19,27 +19,29 @@ package org.apache.catalina.tribes.group; import java.io.Serializable; import org.apache.catalina.tribes.Member; + /** - * Extension to the {@link RpcCallback} interface. Allows an RPC messenger to get a confirmation if the reply - * was sent successfully to the original sender. - * + * Extension to the {@link RpcCallback} interface. Allows an RPC messenger to get a confirmation if the reply was sent + * successfully to the original sender. */ public interface ExtendedRpcCallback extends RpcCallback { /** * The reply failed. - * @param request - the original message that requested the reply + * + * @param request - the original message that requested the reply * @param response - the reply message to the original message - * @param sender - the sender requested that reply - * @param reason - the reason the reply failed + * @param sender - the sender requested that reply + * @param reason - the reason the reply failed */ void replyFailed(Serializable request, Serializable response, Member sender, Exception reason); /** * The reply succeeded - * @param request - the original message that requested the reply + * + * @param request - the original message that requested the reply * @param response - the reply message to the original message - * @param sender - the sender requested that reply + * @param sender - the sender requested that reply */ void replySucceeded(Serializable request, Serializable response, Member sender); } diff --git a/java/org/apache/catalina/tribes/group/GroupChannel.java b/java/org/apache/catalina/tribes/group/GroupChannel.java index 7ad0eb036d..31f1c074c1 100644 --- a/java/org/apache/catalina/tribes/group/GroupChannel.java +++ b/java/org/apache/catalina/tribes/group/GroupChannel.java @@ -60,28 +60,26 @@ import org.apache.juli.logging.LogFactory; /** * The default implementation of a Channel.<br> - * The GroupChannel manages the replication channel. It coordinates - * message being sent and received with membership announcements. - * The channel has an chain of interceptors that can modify the message or perform other logic.<br> + * The GroupChannel manages the replication channel. It coordinates message being sent and received with membership + * announcements. The channel has an chain of interceptors that can modify the message or perform other logic.<br> * It manages a complete group, both membership and replication. */ -public class GroupChannel extends ChannelInterceptorBase - implements ManagedChannel, JmxChannel, GroupChannelMBean { +public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel, JmxChannel, GroupChannelMBean { private static final Log log = LogFactory.getLog(GroupChannel.class); protected static final StringManager sm = StringManager.getManager(GroupChannel.class); /** - * Flag to determine if the channel manages its own heartbeat - * If set to true, the channel will start a local thread for the heart beat. + * Flag to determine if the channel manages its own heartbeat If set to true, the channel will start a local thread + * for the heart beat. */ protected boolean heartbeat = true; /** - * If <code>heartbeat == true</code> then how often do we want this - * heartbeat to run. The default value is 5000 milliseconds. + * If <code>heartbeat == true</code> then how often do we want this heartbeat to run. The default value is 5000 + * milliseconds. */ - protected long heartbeatSleeptime = 5*1000; + protected long heartbeatSleeptime = 5 * 1000; /** * Internal heartbeat future @@ -90,7 +88,7 @@ public class GroupChannel extends ChannelInterceptorBase protected ScheduledFuture<?> monitorFuture; /** - * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> + * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> * - MembershipService<br> * - ChannelSender <br> * - ChannelReceiver<br> @@ -98,9 +96,8 @@ public class GroupChannel extends ChannelInterceptorBase protected final ChannelCoordinator coordinator = new ChannelCoordinator(); /** - * The first interceptor in the interceptor stack. - * The interceptors are chained in a linked list, so we only need a reference to the - * first one + * The first interceptor in the interceptor stack. The interceptors are chained in a linked list, so we only need a + * reference to the first one */ protected ChannelInterceptor interceptors = null; @@ -150,8 +147,7 @@ public class GroupChannel extends ChannelInterceptorBase private ObjectName oname = null; /** - * Creates a GroupChannel. This constructor will also - * add the first interceptor in the GroupChannel.<br> + * Creates a GroupChannel. This constructor will also add the first interceptor in the GroupChannel.<br> * The first interceptor is always the channel itself. */ public GroupChannel() { @@ -161,14 +157,14 @@ public class GroupChannel extends ChannelInterceptorBase @Override public void addInterceptor(ChannelInterceptor interceptor) { - if ( interceptors == null ) { + if (interceptors == null) { interceptors = interceptor; interceptors.setNext(coordinator); interceptors.setPrevious(null); coordinator.setPrevious(interceptors); } else { ChannelInterceptor last = interceptors; - while ( last.getNext() != coordinator ) { + while (last.getNext() != coordinator) { last = last.getNext(); } last.setNext(interceptor); @@ -180,37 +176,36 @@ public class GroupChannel extends ChannelInterceptorBase /** * Sends a heartbeat through the interceptor stack.<br> - * Invoke this method from the application on a periodic basis if - * you have turned off internal heartbeats <code>channel.setHeartbeat(false)</code> + * Invoke this method from the application on a periodic basis if you have turned off internal heartbeats + * <code>channel.setHeartbeat(false)</code> */ @Override public void heartbeat() { super.heartbeat(); for (MembershipListener listener : membershipListeners) { - if ( listener instanceof Heartbeat ) { - ((Heartbeat)listener).heartbeat(); + if (listener instanceof Heartbeat) { + ((Heartbeat) listener).heartbeat(); } } for (ChannelListener listener : channelListeners) { - if ( listener instanceof Heartbeat ) { - ((Heartbeat)listener).heartbeat(); + if (listener instanceof Heartbeat) { + ((Heartbeat) listener).heartbeat(); } } } @Override - public UniqueId send(Member[] destination, Serializable msg, int options) - throws ChannelException { - return send(destination,msg,options,null); + public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException { + return send(destination, msg, options, null); } @Override public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException { - if ( msg == null ) { + if (msg == null) { throw new ChannelException(sm.getString("groupChannel.nullMessage")); } XByteBuffer buffer = null; @@ -218,41 +213,39 @@ public class GroupChannel extends ChannelInterceptorBase if (destination == null || destination.length == 0) { throw new ChannelException(sm.getString("groupChannel.noDestination")); } - ChannelData data = new ChannelData(true);//generates a unique Id + ChannelData data = new ChannelData(true);// generates a unique Id data.setAddress(getLocalMember(false)); data.setTimestamp(System.currentTimeMillis()); byte[] b = null; - if ( msg instanceof ByteMessage ){ - b = ((ByteMessage)msg).getMessage(); + if (msg instanceof ByteMessage) { + b = ((ByteMessage) msg).getMessage(); options = options | SEND_OPTIONS_BYTE_MESSAGE; } else { b = XByteBuffer.serialize(msg); options = options & (~SEND_OPTIONS_BYTE_MESSAGE); } data.setOptions(options); - //XByteBuffer buffer = new XByteBuffer(b.length+128,false); - buffer = BufferPool.getBufferPool().getBuffer(b.length+128, false); - buffer.append(b,0,b.length); + // XByteBuffer buffer = new XByteBuffer(b.length+128,false); + buffer = BufferPool.getBufferPool().getBuffer(b.length + 128, false); + buffer.append(b, 0, b.length); data.setMessage(buffer); InterceptorPayload payload = null; - if ( handler != null ) { + if (handler != null) { payload = new InterceptorPayload(); payload.setErrorHandler(handler); } getFirstInterceptor().sendMessage(destination, data, payload); - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + - " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + - Arrays.toNameString(destination)); - Logs.MESSAGES.trace("GroupChannel - Send Message:" + - new UniqueId(data.getUniqueId()) + " is " + msg); + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " + + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination)); + Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " + msg); } return new UniqueId(data.getUniqueId()); } catch (RuntimeException | IOException e) { throw new ChannelException(e); } finally { - if ( buffer != null ) { + if (buffer != null) { BufferPool.getBufferPool().returnBuffer(buffer); } } @@ -261,44 +254,39 @@ public class GroupChannel extends ChannelInterceptorBase /** * Callback from the interceptor stack. <br> - * When a message is received from a remote node, this method will be - * invoked by the previous interceptor.<br> - * This method can also be used to send a message to other components - * within the same application, but its an extreme case, and you're probably - * better off doing that logic between the applications itself. + * When a message is received from a remote node, this method will be invoked by the previous interceptor.<br> + * This method can also be used to send a message to other components within the same application, but its an + * extreme case, and you're probably better off doing that logic between the applications itself. + * * @param msg ChannelMessage */ @Override public void messageReceived(ChannelMessage msg) { - if ( msg == null ) { + if (msg == null) { return; } try { - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("GroupChannel - Received msg:" + - new UniqueId(msg.getUniqueId()) + " at " + - new java.sql.Timestamp(System.currentTimeMillis()) + " from " + - msg.getAddress().getName()); + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " + + new java.sql.Timestamp(System.currentTimeMillis()) + " from " + msg.getAddress().getName()); } Serializable fwd = null; - if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) { + if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE) { fwd = new ByteMessage(msg.getMessage().getBytes()); } else { try { - fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, - msg.getMessage().getLength()); - }catch (Exception sx) { - log.error(sm.getString("groupChannel.unable.deserialize", msg),sx); + fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength()); + } catch (Exception sx) { + log.error(sm.getString("groupChannel.unable.deserialize", msg), sx); return; } } - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("GroupChannel - Receive Message:" + - new UniqueId(msg.getUniqueId()) + " is " + fwd); + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " + fwd); } - //get the actual member with the correct alive time + // get the actual member with the correct alive time Member source = msg.getAddress(); boolean rx = false; boolean delivered = false; @@ -306,61 +294,62 @@ public class GroupChannel extends ChannelInterceptorBase if (channelListener != null && channelListener.accept(fwd, source)) { channelListener.messageReceived(fwd, source); delivered = true; - //if the message was accepted by an RPC channel, that channel - //is responsible for returning the reply, otherwise we send an absence reply + // if the message was accepted by an RPC channel, that channel + // is responsible for returning the reply, otherwise we send an absence reply if (channelListener instanceof RpcChannel) { rx = true; } } - }//for + } // for if ((!rx) && (fwd instanceof RpcMessage)) { - //if we have a message that requires a response, - //but none was given, send back an immediate one - sendNoRpcChannelReply((RpcMessage)fwd,source); + // if we have a message that requires a response, + // but none was given, send back an immediate one + sendNoRpcChannelReply((RpcMessage) fwd, source); } - if ( Logs.MESSAGES.isTraceEnabled() ) { - Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "] id:" + - new UniqueId(msg.getUniqueId())); + if (Logs.MESSAGES.isTraceEnabled()) { + Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "] id:" + new UniqueId(msg.getUniqueId())); } - } catch ( Exception x ) { - //this could be the channel listener throwing an exception, we should log it - //as a warning. - if ( log.isWarnEnabled() ) { - log.warn(sm.getString("groupChannel.receiving.error"),x); + } catch (Exception x) { + // this could be the channel listener throwing an exception, we should log it + // as a warning. + if (log.isWarnEnabled()) { + log.warn(sm.getString("groupChannel.receiving.error"), x); } - throw new RemoteProcessException(sm.getString("groupChannel.receiving.error"),x); + throw new RemoteProcessException(sm.getString("groupChannel.receiving.error"), x); } } /** * Sends a <code>NoRpcChannelReply</code> message to a member<br> - * This method gets invoked by the channel if an RPC message comes in - * and no channel listener accepts the message. This avoids timeout - * @param msg RpcMessage + * This method gets invoked by the channel if an RPC message comes in and no channel listener accepts the message. + * This avoids timeout + * + * @param msg RpcMessage * @param destination Member - the destination for the reply */ protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) { try { - //avoid circular loop - if ( msg instanceof RpcMessage.NoRpcChannelReply) { + // avoid circular loop + if (msg instanceof RpcMessage.NoRpcChannelReply) { return; } RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId, msg.uuid); - send(new Member[]{destination}, reply, SEND_OPTIONS_ASYNCHRONOUS); - } catch ( Exception x ) { - log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"),x); + send(new Member[] { destination }, reply, SEND_OPTIONS_ASYNCHRONOUS); + } catch (Exception x) { + log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"), x); } } /** - * memberAdded gets invoked by the interceptor below the channel - * and the channel will broadcast it to the membership listeners + * memberAdded gets invoked by the interceptor below the channel and the channel will broadcast it to the membership + * listeners + * * @param member Member - the new member */ @Override public void memberAdded(Member member) { - //notify upwards + // notify upwards for (MembershipListener membershipListener : membershipListeners) { if (membershipListener != null) { membershipListener.memberAdded(member); @@ -369,13 +358,14 @@ public class GroupChannel extends ChannelInterceptorBase } /** - * memberDisappeared gets invoked by the interceptor below the channel - * and the channel will broadcast it to the membership listeners + * memberDisappeared gets invoked by the interceptor below the channel and the channel will broadcast it to the + * membership listeners + * * @param member Member - the member that left or crashed */ @Override public void memberDisappeared(Member member) { - //notify upwards + // notify upwards for (MembershipListener membershipListener : membershipListeners) { if (membershipListener != null) { membershipListener.memberDisappeared(member); @@ -384,13 +374,12 @@ public class GroupChannel extends ChannelInterceptorBase } /** - * Sets up the default implementation interceptor stack - * if no interceptors have been added + * Sets up the default implementation interceptor stack if no interceptors have been added + * * @throws ChannelException Cluster error */ protected synchronized void setupDefaultStack() throws ChannelException { - if (getFirstInterceptor() != null && - ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) { + if (getFirstInterceptor() != null && ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) { addInterceptor(new MessageDispatchInterceptor()); } Iterator<ChannelInterceptor> interceptors = getInterceptors(); @@ -402,20 +391,21 @@ public class GroupChannel extends ChannelInterceptorBase } /** - * Validates the option flags that each interceptor is using and reports - * an error if two interceptor share the same flag. + * Validates the option flags that each interceptor is using and reports an error if two interceptor share the same + * flag. + * * @throws ChannelException Error with option flag */ protected void checkOptionFlags() throws ChannelException { StringBuilder conflicts = new StringBuilder(); ChannelInterceptor first = interceptors; - while ( first != null ) { + while (first != null) { int flag = first.getOptionFlag(); - if ( flag != 0 ) { + if (flag != 0) { ChannelInterceptor next = first.getNext(); - while ( next != null ) { + while (next != null) { int nflag = next.getOptionFlag(); - if (nflag!=0 && (((flag & nflag) == flag ) || ((flag & nflag) == nflag)) ) { + if (nflag != 0 && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) { conflicts.append('['); conflicts.append(first.getClass().getName()); conflicts.append(':'); @@ -425,15 +415,14 @@ public class GroupChannel extends ChannelInterceptorBase conflicts.append(':'); conflicts.append(nflag); conflicts.append("] "); - }//end if + } // end if next = next.getNext(); - }//while - }//end if + } // while + } // end if first = first.getNext(); - }//while - if ( conflicts.length() > 0 ) { - throw new ChannelException(sm.getString("groupChannel.optionFlag.conflict", - conflicts.toString())); + } // while + if (conflicts.length() > 0) { + throw new ChannelException(sm.getString("groupChannel.optionFlag.conflict", conflicts.toString())); } } @@ -470,8 +459,8 @@ public class GroupChannel extends ChannelInterceptorBase log.error(sm.getString("groupChannel.unable.sendHeartbeat"), e); } } - heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new HeartbeatRunnable(), - heartbeatSleeptime, heartbeatSleeptime, TimeUnit.MILLISECONDS); + heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new HeartbeatRunnable(), heartbeatSleeptime, + heartbeatSleeptime, TimeUnit.MILLISECONDS); } } @@ -499,6 +488,7 @@ public class GroupChannel extends ChannelInterceptorBase /** * Returns the first interceptor of the stack. Useful for traversal. + * * @return ChannelInterceptor */ public ChannelInterceptor getFirstInterceptor() { @@ -551,7 +541,7 @@ public class GroupChannel extends ChannelInterceptorBase @Override public void addMembershipListener(MembershipListener membershipListener) { - if (!this.membershipListeners.contains(membershipListener) ) { + if (!this.membershipListeners.contains(membershipListener)) { this.membershipListeners.add(membershipListener); } } @@ -563,11 +553,11 @@ public class GroupChannel extends ChannelInterceptorBase @Override public void addChannelListener(ChannelListener channelListener) { - if (!this.channelListeners.contains(channelListener) ) { + if (!this.channelListeners.contains(channelListener)) { this.channelListeners.add(channelListener); } else { - throw new IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist", - channelListener,channelListener.getClass().getName())); + throw new IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist", channelListener, + channelListener.getClass().getName())); } } @@ -578,14 +568,14 @@ public class GroupChannel extends ChannelInterceptorBase @Override public Iterator<ChannelInterceptor> getInterceptors() { - return new InterceptorIterator(this.getNext(),this.coordinator); + return new InterceptorIterator(this.getNext(), this.coordinator); } /** * Enables/disables the option check<br> - * Setting this to true, will make the GroupChannel perform a conflict check - * on the interceptors. If two interceptors are using the same option flag - * and throw an error upon start. + * Setting this to true, will make the GroupChannel perform a conflict check on the interceptors. If two + * interceptors are using the same option flag and throw an error upon start. + * * @param optionCheck boolean */ public void setOptionCheck(boolean optionCheck) { @@ -595,6 +585,7 @@ public class GroupChannel extends ChannelInterceptorBase /** * Configure local heartbeat sleep time<br> * Only used when <code>getHeartbeat()==true</code> + * * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats */ public void setHeartbeatSleeptime(long heartbeatSleeptime) { @@ -602,9 +593,9 @@ public class GroupChannel extends ChannelInterceptorBase } /** - * Enables or disables local heartbeat. - * if <code>setHeartbeat(true)</code> is invoked then the channel will start an internal - * thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds + * Enables or disables local heartbeat. if <code>setHeartbeat(true)</code> is invoked then the channel will start an + * internal thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds + * * @param heartbeat boolean */ @Override @@ -623,8 +614,8 @@ public class GroupChannel extends ChannelInterceptorBase } /** - * @return the sleep time in milliseconds that the internal heartbeat will - * sleep in between invocations of <code>Channel.heartbeat()</code> + * @return the sleep time in milliseconds that the internal heartbeat will sleep in between invocations of + * <code>Channel.heartbeat()</code> */ @Override public long getHeartbeatSleeptime() { @@ -672,8 +663,7 @@ public class GroupChannel extends ChannelInterceptorBase } @Override - public ObjectName preRegister(MBeanServer server, ObjectName name) - throws Exception { + public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { // NOOP return null; } @@ -699,6 +689,7 @@ public class GroupChannel extends ChannelInterceptorBase public static class InterceptorIterator implements Iterator<ChannelInterceptor> { private final ChannelInterceptor end; private ChannelInterceptor start; + public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) { this.end = end; this.start = start; @@ -706,13 +697,13 @@ public class GroupChannel extends ChannelInterceptorBase @Override public boolean hasNext() { - return start!=null && start != end; + return start != null && start != end; } @Override public ChannelInterceptor next() { ChannelInterceptor result = null; - if ( hasNext() ) { + if (hasNext()) { result = start; start = start.getNext(); } @@ -721,15 +712,17 @@ public class GroupChannel extends ChannelInterceptorBase @Override public void remove() { - //empty operation + // empty operation } } /** - * <p>Title: Internal heartbeat runnable</p> - * - * <p>Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class - * is created</p> + * <p> + * Title: Internal heartbeat runnable + * </p> + * <p> + * Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class is created + * </p> */ public class HeartbeatRunnable implements Runnable { @Override diff --git a/java/org/apache/catalina/tribes/group/GroupChannelMBean.java b/java/org/apache/catalina/tribes/group/GroupChannelMBean.java index 83bbaa6487..aecc054132 100644 --- a/java/org/apache/catalina/tribes/group/GroupChannelMBean.java +++ b/java/org/apache/catalina/tribes/group/GroupChannelMBean.java @@ -39,11 +39,9 @@ public interface GroupChannelMBean { void stop(int svc) throws ChannelException; - UniqueId send(Member[] destination, Serializable msg, int options) - throws ChannelException; + UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException; - UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) - throws ChannelException; + UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException; void addMembershipListener(MembershipListener listener); @@ -53,9 +51,9 @@ public interface GroupChannelMBean { void removeChannelListener(ChannelListener listener); - boolean hasMembers() ; + boolean hasMembers(); - Member[] getMembers() ; + Member[] getMembers(); Member getLocalMember(boolean incAlive); diff --git a/java/org/apache/catalina/tribes/group/InterceptorPayload.java b/java/org/apache/catalina/tribes/group/InterceptorPayload.java index d0b44c812f..3aceeea3d7 100644 --- a/java/org/apache/catalina/tribes/group/InterceptorPayload.java +++ b/java/org/apache/catalina/tribes/group/InterceptorPayload.java @@ -18,7 +18,7 @@ package org.apache.catalina.tribes.group; import org.apache.catalina.tribes.ErrorHandler; -public class InterceptorPayload { +public class InterceptorPayload { private ErrorHandler errorHandler; public ErrorHandler getErrorHandler() { diff --git a/java/org/apache/catalina/tribes/group/Response.java b/java/org/apache/catalina/tribes/group/Response.java index 91aa73fba5..6d53e1b687 100644 --- a/java/org/apache/catalina/tribes/group/Response.java +++ b/java/org/apache/catalina/tribes/group/Response.java @@ -26,6 +26,7 @@ import org.apache.catalina.tribes.Member; public class Response { private Member source; private Serializable message; + public Response() { } diff --git a/java/org/apache/catalina/tribes/group/RpcCallback.java b/java/org/apache/catalina/tribes/group/RpcCallback.java index 83ee4ec339..e376a7eefe 100644 --- a/java/org/apache/catalina/tribes/group/RpcCallback.java +++ b/java/org/apache/catalina/tribes/group/RpcCallback.java @@ -21,23 +21,26 @@ import java.io.Serializable; import org.apache.catalina.tribes.Member; /** - * The RpcCallback interface is an interface for the Tribes channel to request a - * response object to a request that came in. + * The RpcCallback interface is an interface for the Tribes channel to request a response object to a request that came + * in. */ public interface RpcCallback { /** * Allows sending a response to a received message. - * @param msg The message + * + * @param msg The message * @param sender Member + * * @return Serializable object, <code>null</code> if no reply should be sent */ Serializable replyRequest(Serializable msg, Member sender); /** - * If the reply has already been sent to the requesting thread, - * the rpc callback can handle any data that comes in after the fact. - * @param msg The message + * If the reply has already been sent to the requesting thread, the rpc callback can handle any data that comes in + * after the fact. + * + * @param msg The message * @param sender Member */ void leftOver(Serializable msg, Member sender); diff --git a/java/org/apache/catalina/tribes/group/RpcChannel.java b/java/org/apache/catalina/tribes/group/RpcChannel.java index c72963f2e5..6f0b0c087b 100644 --- a/java/org/apache/catalina/tribes/group/RpcChannel.java +++ b/java/org/apache/catalina/tribes/group/RpcChannel.java @@ -50,13 +50,13 @@ public class RpcChannel implements ChannelListener { private byte[] rpcId; private int replyMessageOptions = 0; - private final ConcurrentMap<RpcCollectorKey, RpcCollector> responseMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<RpcCollectorKey,RpcCollector> responseMap = new ConcurrentHashMap<>(); /** - * Create an RPC channel. You can have several RPC channels attached to a group - * all separated out by the uniqueness - * @param rpcId - the unique Id for this RPC group - * @param channel Channel + * Create an RPC channel. You can have several RPC channels attached to a group all separated out by the uniqueness + * + * @param rpcId - the unique Id for this RPC group + * @param channel Channel * @param callback RpcCallback */ public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { @@ -69,42 +69,41 @@ public class RpcChannel implements ChannelListener { /** * Send a message and wait for the response. - * @param destination Member[] - the destination for the message, and the members you request a reply from - * @param message Serializable - the message you are sending out - * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY + * + * @param destination Member[] - the destination for the message, and the members you request a reply from + * @param message Serializable - the message you are sending out + * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY * @param channelOptions channel sender options - * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned + * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned + * * @return Response[] - an array of response objects. + * * @throws ChannelException Error sending message */ - public Response[] send(Member[] destination, - Serializable message, - int rpcOptions, - int channelOptions, - long timeout) throws ChannelException { + public Response[] send(Member[] destination, Serializable message, int rpcOptions, int channelOptions, long timeout) + throws ChannelException { - if ( destination==null || destination.length == 0 ) { + if (destination == null || destination.length == 0) { return new Response[0]; } - //avoid dead lock - int sendOptions = - channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; + // avoid dead lock + int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); - RpcCollector collector = new RpcCollector(key,rpcOptions,destination.length); + RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length); try { synchronized (collector) { - if ( rpcOptions != NO_REPLY ) { + if (rpcOptions != NO_REPLY) { responseMap.put(key, collector); } RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); channel.send(destination, rmsg, sendOptions); - if ( rpcOptions != NO_REPLY ) { + if (rpcOptions != NO_REPLY) { collector.wait(timeout); } } - } catch ( InterruptedException ix ) { + } catch (InterruptedException ix) { Thread.currentThread().interrupt(); } finally { responseMap.remove(key); @@ -114,9 +113,9 @@ public class RpcChannel implements ChannelListener { @Override public void messageReceived(Serializable msg, Member sender) { - RpcMessage rmsg = (RpcMessage)msg; + RpcMessage rmsg = (RpcMessage) msg; RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); - if ( rmsg.reply ) { + if (rmsg.reply) { RpcCollector collector = responseMap.get(key); if (collector == null) { if (!(rmsg instanceof RpcMessage.NoRpcChannelReply)) { @@ -124,9 +123,9 @@ public class RpcChannel implements ChannelListener { } } else { synchronized (collector) { - //make sure it hasn't been removed - if ( responseMap.containsKey(key) ) { - if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) { + // make sure it hasn't been removed + if (responseMap.containsKey(key)) { + if ((rmsg instanceof RpcMessage.NoRpcChannelReply)) { collector.destcnt--; } else { collector.addResponse(rmsg.message, sender); @@ -135,27 +134,30 @@ public class RpcChannel implements ChannelListener { collector.notifyAll(); } } else { - if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) { + if (!(rmsg instanceof RpcMessage.NoRpcChannelReply)) { callback.leftOver(rmsg.message, sender); } } - }//synchronized - }//end if + } // synchronized + } // end if } else { boolean finished = false; - final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null; - boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); - Serializable reply = callback.replyRequest(rmsg.message,sender); + final ExtendedRpcCallback excallback = + (callback instanceof ExtendedRpcCallback) ? ((ExtendedRpcCallback) callback) : null; + boolean asyncReply = + ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); + Serializable reply = callback.replyRequest(rmsg.message, sender); ErrorHandler handler = null; final Serializable request = msg; final Serializable response = reply; final Member fsender = sender; - if (excallback!=null && asyncReply) { + if (excallback != null && asyncReply) { handler = new ErrorHandler() { @Override public void handleError(ChannelException x, UniqueId id) { excallback.replyFailed(request, response, fsender, x); } + @Override public void handleCompletion(UniqueId id) { excallback.replySucceeded(request, response, fsender); @@ -165,23 +167,25 @@ public class RpcChannel implements ChannelListener { rmsg.reply = true; rmsg.message = reply; try { - if (handler!=null) { - channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); + if (handler != null) { + channel.send(new Member[] { sender }, rmsg, + replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); } else { - channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + channel.send(new Member[] { sender }, rmsg, + replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); } finished = true; - } catch ( Exception x ) { + } catch (Exception x) { if (excallback != null && !asyncReply) { excallback.replyFailed(rmsg.message, reply, sender, x); } else { - log.error(sm.getString("rpcChannel.replyFailed"),x); + log.error(sm.getString("rpcChannel.replyFailed"), x); } } if (finished && excallback != null && !asyncReply) { excallback.replySucceeded(rmsg.message, reply, sender); } - }//end if + } // end if } public void breakdown() { @@ -190,9 +194,9 @@ public class RpcChannel implements ChannelListener { @Override public boolean accept(Serializable msg, Member sender) { - if ( msg instanceof RpcMessage ) { - RpcMessage rmsg = (RpcMessage)msg; - return Arrays.equals(rmsg.rpcId,rpcId); + if (msg instanceof RpcMessage) { + RpcMessage rmsg = (RpcMessage) msg; + return Arrays.equals(rmsg.rpcId, rpcId); } else { return false; } @@ -246,22 +250,22 @@ public class RpcChannel implements ChannelListener { } public void addResponse(Serializable message, Member sender) { - Response resp = new Response(sender,message); + Response resp = new Response(sender, message); responses.add(resp); } public boolean isComplete() { - if ( destcnt <= 0 ) { + if (destcnt <= 0) { return true; } switch (options) { case ALL_REPLY: return destcnt == responses.size(); case MAJORITY_REPLY: - float perc = ((float)responses.size()) / ((float)destcnt); + float perc = ((float) responses.size()) / ((float) destcnt); return perc >= 0.50f; case FIRST_REPLY: - return responses.size()>0; + return responses.size() > 0; default: return false; } @@ -274,8 +278,8 @@ public class RpcChannel implements ChannelListener { @Override public boolean equals(Object o) { - if ( o instanceof RpcCollector ) { - RpcCollector r = (RpcCollector)o; + if (o instanceof RpcCollector) { + RpcCollector r = (RpcCollector) o; return r.key.equals(this.key); } else { return false; @@ -289,20 +293,21 @@ public class RpcChannel implements ChannelListener { public static class RpcCollectorKey { final byte[] id; + public RpcCollectorKey(byte[] id) { this.id = id; } @Override public int hashCode() { - return id[0]+id[1]+id[2]+id[3]; + return id[0] + id[1] + id[2] + id[3]; } @Override public boolean equals(Object o) { - if ( o instanceof RpcCollectorKey ) { - RpcCollectorKey r = (RpcCollectorKey)o; - return Arrays.equals(id,r.id); + if (o instanceof RpcCollectorKey) { + RpcCollectorKey r = (RpcCollectorKey) o; + return Arrays.equals(id, r.id); } else { return false; } diff --git a/java/org/apache/catalina/tribes/group/RpcMessage.java b/java/org/apache/catalina/tribes/group/RpcMessage.java index 0fc804d219..052befc9e5 100644 --- a/java/org/apache/catalina/tribes/group/RpcMessage.java +++ b/java/org/apache/catalina/tribes/group/RpcMessage.java @@ -32,7 +32,7 @@ public class RpcMessage implements Externalizable { protected boolean reply = false; public RpcMessage() { - //for serialization + // for serialization } public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { @@ -42,7 +42,7 @@ public class RpcMessage implements Externalizable { } @Override - public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { reply = in.readBoolean(); int length = in.readInt(); uuid = new byte[length]; @@ -50,7 +50,7 @@ public class RpcMessage implements Externalizable { length = in.readInt(); rpcId = new byte[length]; in.readFully(rpcId); - message = (Serializable)in.readObject(); + message = (Serializable) in.readObject(); } @Override @@ -82,7 +82,7 @@ public class RpcMessage implements Externalizable { } public NoRpcChannelReply(byte[] rpcid, byte[] uuid) { - super(rpcid,uuid,null); + super(rpcid, uuid, null); reply = true; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org