Author: fhanik
Date: Tue May 2 13:30:17 2006
New Revision: 399039
URL: http://svn.apache.org/viewcvs?rev=399039&view=rev
Log:
Refactored the sender so that its easy to transfer properties, and not missing
important settings
Moved shutdown payload to the Member interface so that an app can differentiate
between shutdown and crash
Modified:
tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/doc/introduction.xml Tue May 2
13:30:17 2006
@@ -162,7 +162,8 @@
to the sender. This is a unique feature that adds an incredible
amount value to the application
developer. Most frameworks here will tell you that the message was
delivered, and the application
developer has to build in logic on whether the message was actually
processed properly by the application
- on the remote node.
+ on the remote node. If configured, Tribes will throw an exception
when it receives an ACK_FAIL
+ and associate that exception with the member that didn't process the
message.
</li>
</ol>
You can of course write even more sophisticated guarantee levels, and some
of them will be mentioned later on
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/Member.java
Tue May 2 13:30:17 2006
@@ -31,6 +31,13 @@
public interface Member {
+
+ /**
+ * When a member leaves the cluster, the payload of the memberDisappeared
member
+ * will be the following bytes.
+ */
+ public static final byte[] SHUTDOWN_PAYLOAD = new byte[] {66, 65, 66, 89,
45, 65, 76, 69, 88};
+
/**
* Return implementation specific properties about this cluster node.
*/
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/membership/McastServiceImpl.java
Tue May 2 13:30:17 2006
@@ -25,6 +25,7 @@
import org.apache.catalina.tribes.MembershipListener;
import java.util.Arrays;
import java.net.SocketTimeoutException;
+import org.apache.catalina.tribes.Member;
/**
* A <b>membership</b> implementation using simple multicast.
@@ -112,8 +113,6 @@
*/
protected InetAddress mcastBindAddress = null;
- protected static final byte[] STOP_PAYLOAD = new byte[] {66, 65, 66, 89,
45, 65, 76, 69, 88};
-
/**
* Create a new mcast service impl
* @param member - the local member
@@ -213,7 +212,7 @@
receiver = null;
//send a stop message
byte[] payload = member.getPayload();
- member.setPayload(STOP_PAYLOAD);
+ member.setPayload(Member.SHUTDOWN_PAYLOAD);
member.getData(true,true);
send();
//restore payload
@@ -238,7 +237,7 @@
if (log.isDebugEnabled())
log.debug("Mcast receive ping from member " + m);
- if (Arrays.equals(m.getPayload(), STOP_PAYLOAD)) {
+ if (Arrays.equals(m.getPayload(), Member.SHUTDOWN_PAYLOAD)) {
if (log.isDebugEnabled()) log.debug("Member has shutdown:" +
m);
membership.removeMcastMember(m);
service.memberDisappeared(m);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/AbstractSender.java
Tue May 2 13:30:17 2006
@@ -48,7 +48,7 @@
private Member destination;
private InetAddress address;
private int port;
- private int maxRetryAttempts = 2;//zero resends
+ private int maxRetryAttempts = 1;//1 resends
private int attempt;
private boolean tcpNoDelay = true;
private boolean soKeepAlive = false;
@@ -58,22 +58,38 @@
private int soLingerTime = 3;
private int soTrafficClass = 0x04 | 0x08 | 0x010;
private boolean throwOnFailedAck = false;
+
+ /**
+ * transfers sender properties from one sender to another
+ * @param from AbstractSender
+ * @param to AbstractSender
+ */
+ public static void transferProperties(AbstractSender from, AbstractSender
to) {
+ to.rxBufSize = from.rxBufSize;
+ to.txBufSize = from.txBufSize;
+ to.directBuffer = from.directBuffer;
+ to.keepAliveCount = from.keepAliveCount;
+ to.keepAliveTime = from.keepAliveTime;
+ to.timeout = from.timeout;
+ to.destination = from.destination;
+ to.address = from.address;
+ to.port = from.port;
+ to.maxRetryAttempts = from.maxRetryAttempts;
+ to.tcpNoDelay = from.tcpNoDelay;
+ to.soKeepAlive = from.soKeepAlive;
+ to.ooBInline = from.ooBInline;
+ to.soReuseAddress = from.soReuseAddress;
+ to.soLingerOn = from.soLingerOn;
+ to.soLingerTime = from.soLingerTime;
+ to.soTrafficClass = from.soTrafficClass;
+ to.throwOnFailedAck = from.throwOnFailedAck;
+ }
+
+
public AbstractSender() {
}
- public AbstractSender(Member destination) throws UnknownHostException {
- this.destination = destination;
- this.address = InetAddress.getByAddress(destination.getHost());
- this.port = destination.getPort();
- }
-
- public AbstractSender(Member destination, int rxBufSize, int txBufSize)
throws UnknownHostException {
- this(destination);
- this.rxBufSize = rxBufSize;
- this.txBufSize = txBufSize;
- }
-
/**
* connect
*
@@ -268,4 +284,20 @@
public void setThrowOnFailedAck(boolean throwOnFailedAck) {
this.throwOnFailedAck = throwOnFailedAck;
}
+
+ public void setDestination(Member destination) throws UnknownHostException
{
+ this.destination = destination;
+ this.address = InetAddress.getByAddress(destination.getHost());
+ this.port = destination.getPort();
+
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public void setAddress(InetAddress address) {
+ this.address = address;
+ }
+
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioSender.java
Tue May 2 13:30:17 2006
@@ -71,15 +71,9 @@
// -------------------------------------------------------------
Constructor
- public BioSender(Member member) throws UnknownHostException {
- super(member);
- if (log.isDebugEnabled())
- log.debug(sm.getString("IDataSender.create",getAddress(), new
Integer(getPort())));
+ public BioSender() {
}
- public BioSender(Member member, int rxBufSize, int txBufSize) throws
UnknownHostException {
- super(member,rxBufSize,txBufSize);
- }
// ------------------------------------------------------------- Properties
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/MultipointBioSender.java
Tue May 2 13:30:17 2006
@@ -57,12 +57,9 @@
try {
BioSender sender = (BioSender) bioSenders.get(destination[i]);
if (sender == null) {
- sender = new BioSender(destination[i], getRxBufSize(),
getTxBufSize());
- sender.setKeepAliveCount(getKeepAliveCount());
- sender.setKeepAliveTime(getKeepAliveTime());
- sender.setTimeout(getTimeout());
- sender.setMaxRetryAttempts(getMaxRetryAttempts());
- sender.setKeepAliveTime(getKeepAliveTime());
+ sender = new BioSender();
+ sender.transferProperties(this,sender);
+ sender.setDestination(destination[i]);
bioSenders.put(destination[i], sender);
}
result[i] = sender;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/PooledMultiSender.java
Tue May 2 13:30:17 2006
@@ -51,10 +51,7 @@
*/
public DataSender getNewDataSender() {
MultipointBioSender sender = new MultipointBioSender();
- sender.setTimeout(getTimeout());
- sender.setMaxRetryAttempts(getMaxRetryAttempts());
- sender.setRxBufSize(getRxBufSize());
- sender.setTxBufSize(getTxBufSize());
+ sender.transferProperties(this,sender);
return sender;
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
Tue May 2 13:30:17 2006
@@ -66,8 +66,8 @@
protected boolean connecting = false;
- public NioSender(Member destination) throws UnknownHostException {
- super(destination);
+ public NioSender() {
+ super();
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
Tue May 2 13:30:17 2006
@@ -202,25 +202,22 @@
NioSender[] result = new NioSender[destination.length];
for ( int i=0; i<destination.length; i++ ) {
NioSender sender = (NioSender)nioSenders.get(destination[i]);
- if ( sender == null ) {
- try {
- sender = new NioSender(destination[i]);
+ try {
+
+ if (sender == null) {
+ sender = new NioSender();
+ sender.transferProperties(this, sender);
nioSenders.put(destination[i], sender);
- }catch ( UnknownHostException x ) {
- if ( cx == null ) cx = new ChannelException("Unable to
setup NioSender.",x);
- cx.addFaultyMember(destination[i],x);
}
- }
- if ( sender != null ) {
- sender.reset();
- sender.setSelector(selector);
- sender.setDirectBuffer(getDirectBuffer());
- sender.setRxBufSize(getRxBufSize());
- sender.setTxBufSize(getTxBufSize());
- sender.setTimeout(getTimeout());
- sender.setKeepAliveCount(getKeepAliveCount());
- sender.setKeepAliveTime(getKeepAliveTime());
- result[i] = sender;
+ if (sender != null) {
+ sender.reset();
+ sender.setDestination(destination[i]);
+ sender.setSelector(selector);
+ result[i] = sender;
+ }
+ }catch ( UnknownHostException x ) {
+ if (cx == null) cx = new ChannelException("Unable to setup
NioSender.", x);
+ cx.addFaultyMember(destination[i], x);
}
}
if ( cx != null ) throw cx;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/PooledParallelSender.java
Tue May 2 13:30:17 2006
@@ -55,11 +55,7 @@
public DataSender getNewDataSender() {
try {
ParallelNioSender sender = new ParallelNioSender();
- sender.setTimeout(getTimeout());
- sender.setMaxRetryAttempts(getMaxRetryAttempts());
- sender.setDirectBuffer(getDirectBuffer());
- sender.setRxBufSize(getRxBufSize());
- sender.setTxBufSize(getTxBufSize());
+ sender.transferProperties(this,sender);
return sender;
} catch ( IOException x ) {
throw new RuntimeException("Unable to open NIO selector.",x);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java?rev=399039&r1=399038&r2=399039&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
Tue May 2 13:30:17 2006
@@ -48,7 +48,8 @@
public void init() throws Exception {
selector = Selector.open();
mbr = new MemberImpl("","localhost",4444,0);
- NioSender sender = new NioSender(mbr);
+ NioSender sender = new NioSender();
+ sender.setDestination(mbr);
sender.setDirectBuffer(true);
sender.setSelector(selector);
sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]