Author: markt
Date: Thu Sep 17 20:30:44 2015
New Revision: 1703680
URL: http://svn.apache.org/viewvc?rev=1703680&view=rev
Log:
Fix multiple concurrency issues in MemberImpl
Concurrent updates to some properties were likely to lead to issues.
Fix the Javadoc warnings
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java?rev=1703680&r1=1703679&r2=1703680&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/membership/MemberImpl.java Thu
Sep 17 20:30:44 2015
@@ -47,21 +47,21 @@ public class MemberImpl implements Membe
/**
* The listen host for this member
*/
- protected byte[] host;
- protected transient String hostname;
+ protected volatile byte[] host;
+ protected volatile transient String hostname;
/**
* The tcp listen port for this member
*/
- protected int port;
+ protected volatile int port;
/**
* The udp listen port for this member
*/
- protected int udpPort = -1;
+ protected volatile int udpPort = -1;
/**
* The tcp/SSL listen port for this member
*/
- protected int securePort = -1;
+ protected volatile int securePort = -1;
/**
* Counter for how many broadcast messages have been sent from this member
@@ -87,24 +87,24 @@ public class MemberImpl implements Membe
/**
* Unique session Id for this member
*/
- protected byte[] uniqueId = new byte[16];
+ protected volatile byte[] uniqueId = new byte[16];
/**
* Custom payload that an app framework can broadcast
* Also used to transport stop command.
*/
- protected byte[] payload = new byte[0];
+ protected volatile byte[] payload = new byte[0];
/**
* Command, so that the custom payload doesn't have to be used
* This is for internal tribes use, such as SHUTDOWN_COMMAND
*/
- protected byte[] command = new byte[0];
+ protected volatile byte[] command = new byte[0];
/**
* Domain if we want to filter based on domain.
*/
- protected byte[] domain = new byte[0];
+ protected volatile byte[] domain = new byte[0];
/**
* Empty constructor for serialization
@@ -114,10 +114,14 @@ public class MemberImpl implements Membe
}
/**
- * Construct a new member object
+ * Construct a new member object.
+ *
* @param host - the tcp listen host
* @param port - the tcp listen port
* @param aliveTime - the number of milliseconds since this member was
created
+ *
+ * @throws IOException If there is an error converting the host name to an
+ * IP address
*/
public MemberImpl(String host,
int port,
@@ -172,7 +176,7 @@ public class MemberImpl implements Membe
@Override
- public int getDataLength() {
+ public synchronized int getDataLength() {
return TRIBES_MBR_BEGIN.length+ //start pkg
4+ //data length
8+ //alive time
@@ -193,7 +197,7 @@ public class MemberImpl implements Membe
@Override
- public byte[] getData(boolean getalive, boolean reset) {
+ public synchronized byte[] getData(boolean getalive, boolean reset) {
if ( reset ) dataPkg = null;
//look in cache first
if ( dataPkg!=null ) {
@@ -223,9 +227,7 @@ public class MemberImpl implements Membe
//payload length - 4 bytes
//payload plen bytes
//end package TRIBES_MBR_END.length
- byte[] addr = host;
long alive=System.currentTimeMillis()-getServiceStartTime();
- byte hl = (byte)addr.length;
byte[] data = new byte[getDataLength()];
int bodylength = (getDataLength() - TRIBES_MBR_BEGIN.length -
TRIBES_MBR_END.length - 4);
@@ -253,10 +255,10 @@ public class MemberImpl implements Membe
XByteBuffer.toBytes(udpPort,data,pos);
pos += 4;
//host length
- data[pos++] = hl;
+ data[pos++] = (byte) host.length;
//host
- System.arraycopy(addr,0,data,pos,addr.length);
- pos+=addr.length;
+ System.arraycopy(host,0,data,pos,host.length);
+ pos+=host.length;
//clen - 4 bytes
XByteBuffer.toBytes(command.length,data,pos);
pos+=4;
@@ -287,9 +289,12 @@ public class MemberImpl implements Membe
return data;
}
/**
- * Deserializes a member from data sent over the wire
- * @param data - the bytes received
- * @return a member object.
+ * Deserializes a member from data sent over the wire.
+ *
+ * @param data The bytes received
+ * @param member The member object to populate
+ *
+ * @return The populated member object.
*/
public static Member getMember(byte[] data, MemberImpl member) {
return getMember(data,0,data.length,member);
@@ -440,6 +445,7 @@ public class MemberImpl implements Membe
if ( this.hostname != null ) return hostname;
else {
try {
+ byte[] host = this.host;
if (DO_DNS_LOOKUPS)
this.hostname =
java.net.InetAddress.getByAddress(host).getHostName();
else
@@ -555,7 +561,8 @@ public class MemberImpl implements Membe
/**
* Returns true if the param o is a McastMember with the same name
- * @param o
+ *
+ * @param o The object to test for equality
*/
@Override
public boolean equals(Object o) {
@@ -568,20 +575,22 @@ public class MemberImpl implements Membe
return false;
}
- public void setHost(byte[] host) {
+ public synchronized void setHost(byte[] host) {
this.host = host;
}
public void setHostname(String host) throws IOException {
hostname = host;
- this.host = java.net.InetAddress.getByName(host).getAddress();
+ synchronized (this) {
+ this.host = java.net.InetAddress.getByName(host).getAddress();
+ }
}
public void setMsgCount(int msgCount) {
this.msgCount = msgCount;
}
- public void setPort(int port) {
+ public synchronized void setPort(int port) {
this.port = port;
this.dataPkg = null;
}
@@ -590,39 +599,50 @@ public class MemberImpl implements Membe
this.serviceStartTime = serviceStartTime;
}
- public void setUniqueId(byte[] uniqueId) {
+ public synchronized void setUniqueId(byte[] uniqueId) {
this.uniqueId = uniqueId!=null?uniqueId:new byte[16];
getData(true,true);
}
@Override
- public void setPayload(byte[] payload) {
- byte[] oldpayload = this.payload;
- this.payload = payload!=null?payload:new byte[0];
- if ( this.getData(true,true).length > McastServiceImpl.MAX_PACKET_SIZE
) {
- this.payload = oldpayload;
- throw new
IllegalArgumentException(sm.getString("memberImpl.large.payload"));
+ public synchronized void setPayload(byte[] payload) {
+ // longs to avoid any possibility of overflow
+ long oldPayloadLength = 0;
+ if (this.payload != null) {
+ oldPayloadLength = this.payload.length;
}
-
+ long newPayloadLength = 0;
+ if (payload != null) {
+ newPayloadLength = payload.length;
+ }
+ if (newPayloadLength > oldPayloadLength) {
+ // It is possible that the max packet size will be exceeded
+ if ((newPayloadLength - oldPayloadLength + getData(false,
false).length) >
+ McastServiceImpl.MAX_PACKET_SIZE) {
+ throw new
IllegalArgumentException(sm.getString("memberImpl.large.payload"));
+ }
+ }
+ this.payload = payload != null ? payload : new byte[0];
+ getData(true, true);
}
@Override
- public void setCommand(byte[] command) {
+ public synchronized void setCommand(byte[] command) {
this.command = command!=null?command:new byte[0];
getData(true,true);
}
- public void setDomain(byte[] domain) {
+ public synchronized void setDomain(byte[] domain) {
this.domain = domain!=null?domain:new byte[0];
getData(true,true);
}
- public void setSecurePort(int securePort) {
+ public synchronized void setSecurePort(int securePort) {
this.securePort = securePort;
this.dataPkg = null;
}
- public void setUdpPort(int port) {
+ public synchronized void setUdpPort(int port) {
this.udpPort = port;
this.dataPkg = null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]