User: hiram
Date: 01/02/27 20:35:02
Modified: src/main/org/jbossmq/cluster/udp Datagram.java
UDPNodeId.java UDPTransport.java
Added: src/main/org/jbossmq/cluster/udp UDPAdminStream.java
UDPStream.java
Removed: src/main/org/jbossmq/cluster/udp AdminStream.java
NormalStream.java
Log:
We can now send point 2 point or broadcast messages in the cluster. Refactored alot
of the udp implementation.
Revision Changes Path
1.3 +60 -53 jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java
Index: Datagram.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- Datagram.java 2001/02/22 06:12:29 1.2
+++ Datagram.java 2001/02/28 04:35:00 1.3
@@ -1,7 +1,7 @@
/*
* JBossMQ, the OpenSource JMS implementation
*
- * Distributable under GPL license.
+ * Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.udp;
@@ -17,30 +17,55 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
class Datagram {
- // MESSAGE FLAGS
- final static int DROPPABLE_MESSAGE_FLAG = 0;
- final static int KEEP_ORDER_MESSAGE_FLAG = 1;
-
// Field offsets in the header of the message
final static int ID_OFFSET = 0; // int
final static int LENGTH_OFFSET = 4; // int
final static int FRAGMENT_ID_OFFSET = 8; // short
- final static int SENDER_ID_OFFSET = 10; // 2 bytes
- final static int TOPIC_ID_OFFSET = 12; // short
- final static byte MESSAGE_FLAGS_OFFSET = 14; // byte
- final static int HEADER_SIZE = 15;
+ final static int SENDER_ID_OFFSET = 10; // 4 bytes
+ final static int TOPIC_ID_OFFSET = 14; // short
+ final static byte MESSAGE_FLAGS_OFFSET = 16; // byte
+ final static int HEADER_SIZE = 17;
+ // MESSAGE FLAGS
+ public final static byte DROPPABLE_FLAG = 1;
+ public final static byte KEEP_ORDER_FLAG = 2;
+ public final static byte ADMIN_FLAG = 4;
+
// the serialized message
byte data[];
- // if this is part of a fragment, next fragment should
- // point to the next fragment.
+ // Who sent this message
+ UDPNodeId senderId;
+ // nextFragment can be used to form a linked list of Datagrams
Datagram nextFragment;
/**
+ * This is used to build a new datagram
+ */
+ Datagram( int length,
+ short fragmentId,
+ UDPNodeId senderId,
+ short topic,
+ byte messageFlags,
+ byte payload[],
+ int offset,
+ int len) {
+
+ data = new byte[len + HEADER_SIZE];
+ SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
+ SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
+ senderId.writeToByteArray(data, SENDER_ID_OFFSET);
+ SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
+ data[MESSAGE_FLAGS_OFFSET] = messageFlags;
+
+ for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
+ data[i] = payload[j];
+ }
+
+ /**
* This is used to reconstruct a datagram that was serialized
*/
Datagram(byte datagram[], int length) {
@@ -50,8 +75,6 @@
}
}
-
-
/**
* Gets the original payload of the datagram.
* @return byte[]
@@ -95,8 +118,6 @@
return SerializerUtil.readIntFrom(data, LENGTH_OFFSET);
}
-
-
/**
* The topic under which this message was sent.
* @return short
@@ -105,44 +126,6 @@
return SerializerUtil.readShortFrom(data, TOPIC_ID_OFFSET);
}
- boolean isDroppable() {
- return ((data[MESSAGE_FLAGS_OFFSET] >> DROPPABLE_MESSAGE_FLAG) & 0x01)
!= 0;
- }
-
- boolean isKeepOrder() {
- return ((data[MESSAGE_FLAGS_OFFSET] >> KEEP_ORDER_MESSAGE_FLAG) &
0x01) != 0;
- }
-
- UDPNodeId senderId;
-
- /**
- * This is used to build a new datagram
- */
- Datagram(
- int id,
- int length,
- short fragmentId,
- UDPNodeId senderId,
- short topic,
- boolean droppable,
- boolean keepOrder,
- byte payload[],
- int offset,
- int len) {
-
- data = new byte[len + HEADER_SIZE];
- SerializerUtil.writeIntTo(id, data, ID_OFFSET);
- SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
- SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
- senderId.writeToByteArray(data, SENDER_ID_OFFSET);
- SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
- data[MESSAGE_FLAGS_OFFSET] =
- (byte) (((droppable ? 1 : 0) << DROPPABLE_MESSAGE_FLAG) |
((keepOrder ? 1 : 0) << KEEP_ORDER_MESSAGE_FLAG));
-
- for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
- data[i] = payload[j];
- }
-
/**
* The id of the sender of this message.
* @return int
@@ -155,5 +138,29 @@
t.readFromByteArray(data, SENDER_ID_OFFSET);
senderId = t;
return t;
+ }
+
+ /**
+ * returns true if all the given flags are set in the message.
+ */
+ boolean isMessageFlagsSet(int flags) {
+ return (data[MESSAGE_FLAGS_OFFSET]&flags)==flags;
+ }
+
+ /**
+ * Gets all the message flags.
+ */
+ byte getMessageFlags() {
+ return data[MESSAGE_FLAGS_OFFSET];
+ }
+
+ /**
+ * The id of the message. All the fragments message will have the
+ * same Id, just different fragment ids.
+ *
+ * @return int
+ */
+ void setId(int id) {
+ SerializerUtil.writeIntTo(id, data, ID_OFFSET);
}
}
1.2 +49 -15 jbossmq/src/main/org/jbossmq/cluster/udp/UDPNodeId.java
Index: UDPNodeId.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/UDPNodeId.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- UDPNodeId.java 2001/02/22 06:12:29 1.1
+++ UDPNodeId.java 2001/02/28 04:35:01 1.2
@@ -1,56 +1,90 @@
+/*
+ * JBossMQ, the OpenSource JMS implementation
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
package org.jbossmq.cluster.udp;
-
-import java.io.Externalizable;
import java.net.InetAddress;
import org.jbossmq.cluster.NodeId;
import org.jbossmq.cluster.SerializerUtil;
/**
+ * This class contains all the information needed to
+ * identify a node in the cluster at the transport level.
+ * Every node in the cluster open two new UDP sockets to
+ * receive point to point and admin messages. That port numbers
+ * combined with the IP address of the machine provides
+ * us a the identity of the node.
*/
public class UDPNodeId implements NodeId {
InetAddress address;
int port;
-/**
- * UDPNodeId constructor comment.
- */
-public UDPNodeId() {
- super();
-}
+ /**
+ * UDPNodeId constructor comment.
+ */
+ public UDPNodeId() {
+ super();
+ }
+
+ /**
+ * You can compare two UDPNodeIds
+ */
public boolean equals(Object obj) {
try {
UDPNodeId o = (UDPNodeId)obj;
return o.address.equals(address) &&
- o.port == port;
+ o.port == port &&
+ o.adminPort == adminPort;
} catch (Throwable e) {
return false;
}
}
+
+ /**
+ * The Ip address hash should get good enough in most cases.
+ * (Bad when you are running multiple nodes on a single machine)
+ */
public int hashCode() {
return address.hashCode();
}
+
/**
+ * This is used by the Datagram class.
+ * The address of the UDPNodeId will be set from
+ * the sender field of the IP packet the datagram
+ * was received from.
*/
public void readFromByteArray(byte data[], int pos) {
try {
port = SerializerUtil.readUShortFrom(data, pos);
+ adminPort = SerializerUtil.readUShortFrom(data, pos+2);
} catch ( Exception ignore ) {
- }
-
- }
- public String toString() {
- return "UDPNodeId:["+address.getHostAddress()+":"+port+"]";
+ }
}
+
/**
+ * This is used by the Datagram class.
+ * The address of the UDPNodeId does not need to be
+ * serialized as the IP packet allready has a source
+ * IP packet field that we will take advantage of.
*/
public void writeToByteArray(byte data[], int pos) {
SerializerUtil.writeUShortTo(port,data,pos);
+ SerializerUtil.writeUShortTo(adminPort,data,pos+2);
}
-}
+
+ public String toString() {
+ return
"UDPNodeId:["+address.getHostAddress()+":"+port+":"+adminPort+"]";
+ }
+
+ int adminPort;
+}
\ No newline at end of file
1.3 +310 -92 jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java
Index: UDPTransport.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- UDPTransport.java 2001/02/22 06:12:29 1.2
+++ UDPTransport.java 2001/02/28 04:35:01 1.3
@@ -1,7 +1,7 @@
/*
* JBossMQ, the OpenSource JMS implementation
*
- * Distributable under GPL license.
+ * Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.udp;
@@ -13,9 +13,18 @@
import java.util.Hashtable;
import java.util.LinkedList;
import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Externalizable;
+import java.io.ObjectInputStream;
import org.jbossmq.cluster.TransportListener;
+import org.jbossmq.Log;
+
+import java.util.Vector;
+
/**
* This Transport implements a Datagram based cluster transport.
* It Can use UDP or Multicast.
@@ -26,69 +35,41 @@
* Since Datagrams have a chance of not making it, this class
* also provides retransmision facilities.
*
+ * It uses two UDPStreams. The first stream is the broadcast stream
+ * and it receives multicast packets sent to the cluster.
+ * The second is a point stream and is used to directly send a message
+ * to this node.
+ *
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class UDPTransport implements org.jbossmq.cluster.Transport {
- // The class responsible for pulling messages from the network
- NormalStream normalStream = new NormalStream();
- // The interface into the cluster we report message arivals to
- TransportListener transportListener;
- // We incrementaly number the datagrams we send
- int nextDatagramId = 0;
-
- AdminStream adminStream = new AdminStream();
+ // A reference to the cluster the transport is working with
org.jbossmq.cluster.Cluster cluster;
- // Adderes datagrams will be sent to
-
- InetAddress clusterAddress;
- // Port datagrams will be sent to
- int clusterPort;
- // A message can be composed of multiple datagram fragments.
+ // The interface into the cluster we report message arivals to
+ private TransportListener transportListener;
+ // Receives the multicast messages
+ UDPStream broadcastStream = new UDPStream();
+ // Receives the messages addressed to this node
+ UDPStream pointStream = new UDPStream();
+ // The maximum size a datagram fragment can be.
int maxFragmentSize = 1024;
- // Socket that we will be sending data over.
- DatagramSocket normalSocket;
+ // Is the transport started?
boolean started = false;
- // I like simple constructors.
+ /**
+ * I like simple constructors.
+ */
public UDPTransport() {
- normalStream.transport = this;
+ broadcastStream.transport = this;
+ broadcastStream.setName("Broadcast");
+ pointStream.transport = this;
+ pointStream.setName("Point");
adminStream.transport = this;
}
-
- // I am assuming that the post increment is atomic,
- // otherwise this needs to get synchronized
- int getNextDatagramId() {
- return nextDatagramId++;
- }
-
- // A complete datagram has arrived to us from the InputStream
- // We build a object from it and send it to the TransportListener
- void messageArrivedEvent(Datagram dg) throws InterruptedException {
-
- if (!cluster.isListeningOn(dg.getTopicId()))
- return;
-
- byte data[];
- if (dg.getFragmentCount(maxFragmentSize) == 1) {
- data = dg.getData();
- } else {
- data = new byte[dg.getLength()];
- int dataPos = 0;
- while (dg != null) {
- for (int i = dataPos, j = Datagram.HEADER_SIZE; j <
dg.data.length; i++, j++)
- data[i] = dg.data[j];
- dataPos += dg.data.length - Datagram.HEADER_SIZE;
- dg = dg.nextFragment;
- }
- }
-
- transportListener.messageArrivedEvent(dg.getTopicId(),
dg.getSenderId(), data);
-
- }
// This is set when the transport is set in the cluster.
public void setCluster(org.jbossmq.cluster.Cluster newCluster) {
@@ -100,12 +81,6 @@
transportListener = c;
}
- // This called when the InputStream detects that another node
- // in the cluster is using our nodeId.
- void senderConflictEvent() {
- System.out.println("WARNNING: NodeId conflict detected.");
- }
-
// This method is used to initialize is
// transport layer.
synchronized public void setProperties(Hashtable t)
@@ -134,31 +109,36 @@
InetAddress group = InetAddress.getByName(groupString);
int port = Integer.parseInt(portString);
- DatagramSocket a;
- MulticastSocket s = new MulticastSocket(port);
- s.joinGroup(group);
+ DatagramSocket pointSocket;
+ DatagramSocket adminSocket;
+ MulticastSocket broadcastSocket = new
MulticastSocket(port);
+ broadcastSocket.joinGroup(group);
if (nodeInterfaceString != null) {
nodeId.address =
InetAddress.getByName(nodeInterfaceString);
} else {
nodeId.address = InetAddress.getLocalHost();
}
- s.setInterface(nodeId.address);
+ broadcastSocket.setInterface(nodeId.address);
if (nodePortString != null) {
nodeId.port = Integer.parseInt(nodePortString);
- a = new DatagramSocket(nodeId.port);
+ pointSocket = new DatagramSocket(nodeId.port);
} else {
- a = new DatagramSocket();
- nodeId.port = a.getLocalPort();
+ pointSocket = new DatagramSocket();
+ nodeId.port = pointSocket.getLocalPort();
}
-
- System.out.println("Local NodeId: "+nodeId);
- clusterPort = port;
- clusterAddress = group;
- normalSocket = s;
- adminSocket = a;
+ adminSocket = new DatagramSocket();
+ nodeId.adminPort = adminSocket.getLocalPort();
+
+ Log.log("["+this+"] Local NodeId: "+nodeId);
+
+ clusterId.address = group;
+ clusterId.port = port;
+ broadcastStream.socket = broadcastSocket;
+ pointStream.socket = pointSocket;
+ adminStream.socket = adminSocket;
} catch (IOException e) {
throw new
org.jbossmq.cluster.InvalidConfigurationException(
@@ -171,17 +151,19 @@
// Starts the transport
synchronized public void start() throws
org.jbossmq.cluster.InvalidStateException {
- if (normalSocket == null)
+ if (broadcastStream.socket == null)
throw new org.jbossmq.cluster.InvalidStateException("The
transport properties have not been set yet.");
started = true;
- normalStream.start();
+ broadcastStream.start();
+ pointStream.start();
adminStream.start();
}
// Stops the transport
synchronized public void stop() throws InterruptedException {
- normalStream.stop();
+ broadcastStream.stop();
+ pointStream.stop();
adminStream.stop();
started = false;
}
@@ -190,12 +172,260 @@
// Builds a datagram fragment chain with the given data
// and then sends it over the network
- public void send(short channelId, byte data[], boolean droppable, boolean
keepOrder)
+ public void send(short channelId, byte data[], boolean droppable, boolean
keepOrder) throws IOException, InterruptedException {
+ send( null, channelId, data, droppable, keepOrder);
+ }
+
+
+ // Our unique address in the cluster
+ UDPNodeId nodeId = new UDPNodeId();
+ // timmer Used to slow down the transmision of messages.
+ volatile long stopSendsTill=0;
+
+ //
+ // This is the message sent to request a packet be resent
+ //
+ static class ResendAdminDatagram implements java.io.Serializable {
+ ResendAdminDatagram(int j[], boolean b) {
+ datagramIds = j;
+ broadcast=b;
+ }
+ int datagramIds[];
+ boolean broadcast;
+ }
+
+ //
+ // This is the message sent when the requested packet is not in the cache.
+ //
+ static class ResendErrorAdminDatagram implements java.io.Serializable {
+ ResendErrorAdminDatagram(int j, boolean b) {
+ datagramId = j;
+ broadcast=b;
+ }
+ int datagramId;
+ boolean broadcast;
+ }
+
+ // This send is used to send the administrative messages.
+ public void adminSend(UDPNodeId dest, Object message) throws IOException,
InterruptedException {
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream os = new ObjectOutputStream(bos);
+ os.writeObject(message);
+ os.close();
+
+ byte data[] = bos.toByteArray();
+ Datagram dg = new Datagram(
+ data.length,
+ (short) 0,
+ nodeId,
+ (short) 0,
+ (byte)(Datagram.ADMIN_FLAG|Datagram.DROPPABLE_FLAG),
+ data,
+ 0,
+ data.length);
+
+ dg.setId(adminStream.getNextDatagramId());
+ Log.log("["+this+"] Sending admin datagram: "+dg.getId());
+ DatagramPacket packet =
+ new java.net.DatagramPacket(dg.data, dg.data.length, dest.address,
dest.adminPort);
+ adminStream.socket.send(packet);
+ }
+
+ void datagramArrived(Datagram dg) throws InterruptedException {
+
+ Log.log("["+this+"] A datagram arrived");
+
+ if( dg.isMessageFlagsSet(Datagram.ADMIN_FLAG) ) {
+ try {
+
+ Object o;
+ ByteArrayInputStream bais = new
ByteArrayInputStream(dg.getData());
+ ObjectInputStream is = new ObjectInputStream(bais);
+ o = is.readObject();
+ is.close();
+
+ if (o instanceof ResendAdminDatagram) {
+
+ ResendAdminDatagram radg = (ResendAdminDatagram) o;
+ for( int i=0; i < radg.datagramIds.length; i++ ) {
+ Log.log("["+this+"] RESEND REQUEST ARRIVED: " +
radg.datagramIds[i]);
+ resendDatagram(dg.getSenderId(),
radg.datagramIds[i], radg.broadcast);
+ }
+ } else if (o instanceof ResendErrorAdminDatagram) {
+
+ ResendErrorAdminDatagram readg =
(ResendErrorAdminDatagram) o;
+ Log.log("["+this+"] RESEND REQUEST FAILED: " +
readg.datagramId);
+ if (readg.broadcast) {
+
broadcastStream.removeFromDatagramStream(dg.getSenderId(), readg.datagramId);
+ } else {
+ pointStream.removeFromDatagramStream(dg.getSenderId(),
readg.datagramId);
+ }
+
+ }
+
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+
+ // A complete datagram has arrived to us from the InputStream
+ // We build a object from it and send it to the
TransportListener
+ if (!cluster.isListeningOn(dg.getTopicId()))
+ return;
+
+ byte data[];
+ if (dg.getFragmentCount(maxFragmentSize) == 1) {
+ data = dg.getData();
+ } else {
+ data = new byte[dg.getLength()];
+ int dataPos = 0;
+ while (dg != null) {
+ for (int i = dataPos, j =
Datagram.HEADER_SIZE; j < dg.data.length; i++, j++)
+ data[i] = dg.data[j];
+ dataPos += dg.data.length -
Datagram.HEADER_SIZE;
+ dg = dg.nextFragment;
+ }
+ }
+
+ transportListener.messageArrivedEvent(dg.getTopicId(),
dg.getSenderId(), data);
+ }
+ }
+
+
+
+ // Resends the message if it was found in the cache
+ void resendDatagram(UDPNodeId requestor, int messageId, boolean broadcast)
throws InterruptedException, java.io.IOException {
+
+ slowDownSendRate();
+ if( broadcast ) {
+ Datagram dg = broadcastStream.getFromSentCache(messageId);
+ if(dg!=null) {
+ Log.log("["+this+"] BROADCAST RESEND REQUEST SERVICED:
" + messageId);
+ send(dg, false);
+ return;
+ } else {
+ Log.log("["+this+"] BROADCAST RESEND REQUEST NOT
SERVICED (not in cache): " + messageId);
+ }
+ } else {
+ Datagram dg = pointStream.getFromSentCache(messageId);
+ if(dg!=null) {
+ Log.log("["+this+"] POINT RESEND REQUEST SERVICED: " +
messageId);
+ send(requestor, dg, false);
+ return;
+ } else {
+ Log.log("["+this+"] POINT RESEND REQUEST NOT SERVICED
(not in cache): " + messageId);
+ }
+ }
+
+ ResendErrorAdminDatagram error = new
ResendErrorAdminDatagram(messageId,broadcast);
+ adminSend(requestor, error);
+ }
+
+ // This is set when the transport is set in the cluster.
+ public void slowDownSendRate() {
+ stopSendsTill = System.currentTimeMillis()+1000;
+ }
+
+
+ // Sends a datagram fragment chain over the network
+ synchronized void send(Datagram dg) throws InterruptedException,
java.io.IOException {
+ send( dg, true );
+ }
+
+ // Sends a datagram fragment chain over the network
+ synchronized void send(UDPNodeId dest, Datagram dg) throws
InterruptedException, java.io.IOException {
+ send( dest, dg, true );
+ }
+
+ // The broadcast address of the cluster
+ UDPNodeId clusterId = new UDPNodeId();
+
+ // Sends a datagram fragment chain over the network
+ synchronized void send(Datagram dg, boolean setDatagramId) throws
InterruptedException, java.io.IOException {
+
+ int id=0;
+ if( setDatagramId ) {
+ id = broadcastStream.getNextDatagramId();
+ dg.setId(id);
+ }
+
+ broadcastStream.addToSentCache(dg);
+
+ // Send the fragment chain.
+ Log.log("["+this+"] Sending Broadcast datagram: "+dg.getId());
+ while (dg != null) {
+
+ if( setDatagramId )
+ dg.setId(id);
+
+ DatagramPacket packet =
+ new java.net.DatagramPacket(dg.data, dg.data.length,
clusterId.address, clusterId.port);
+ broadcastStream.socket.send(packet);
+ dg = dg.nextFragment;
+ }
+ }
+
+ // Sends a datagram fragment chain over the network
+ synchronized void send(UDPNodeId dest, Datagram dg, boolean setDatagramId)
throws InterruptedException, java.io.IOException {
+ int id=0;
+ if( setDatagramId ) {
+ id = broadcastStream.getNextDatagramId();
+ dg.setId(id);
+ }
+
+ pointStream.addToSentCache(dg);
+
+ Log.log("["+this+"] Sending Point datagram: "+dg.getId());
+ // Send the fragment chain.
+ while (dg != null) {
+ if( setDatagramId )
+ dg.setId(id);
+
+ DatagramPacket packet =
+ new java.net.DatagramPacket(dg.data, dg.data.length,
dest.address, dest.port);
+ pointStream.socket.send(packet);
+ dg = dg.nextFragment;
+ }
+ }
+
+ // Receives admin messages addressed to this node
+ UDPAdminStream adminStream = new UDPAdminStream();
+
+ // Sends a resend request
+ void requestResend(UDPNodeId nodeId, Vector messageIds, boolean broadcast) {
+
+ int MAX_RESEND_ARRAY_SIZE = 100;
+ int msgs[] = new int[ messageIds.size()<MAX_RESEND_ARRAY_SIZE ?
messageIds.size() : MAX_RESEND_ARRAY_SIZE ];
+ for( int i=0; i<msgs.length; i++ ) {
+ msgs[i] = ((Integer)messageIds.elementAt(i)).intValue();
+ }
+
+ Log.log("["+this+"] SENDING RESEND REQUEST: " + messageIds);
+ ResendAdminDatagram m = new ResendAdminDatagram(msgs,broadcast);
+ try {
+ adminSend(nodeId, m);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ // Builds a datagram fragment chain with the given data
+ // and then sends it over the network
+ public void send(org.jbossmq.cluster.NodeId dest, short channelId, byte
data[], boolean droppable, boolean keepOrder)
throws IOException, InterruptedException {
- int id = getNextDatagramId();
int length = data.length;
short fragmentId = 0;
+ byte msgFlags=0;
+ if( droppable )
+ msgFlags = (byte)(msgFlags | Datagram.DROPPABLE_FLAG);
+ if( keepOrder )
+ msgFlags = (byte)(msgFlags | Datagram.KEEP_ORDER_FLAG);
+
Datagram firstFragment = null;
Datagram lastFragment = null;
@@ -215,13 +445,11 @@
// This a MAX size fragment
dg =
new Datagram(
- id,
length,
fragmentId,
nodeId,
channelId,
- droppable,
- keepOrder,
+ msgFlags,
data,
fragmentId * maxFragmentSize,
maxFragmentSize);
@@ -229,13 +457,11 @@
// This a not a MAX size fragment
dg =
new Datagram(
- id,
length,
fragmentId,
nodeId,
channelId,
- droppable,
- keepOrder,
+ msgFlags,
data,
fragmentId * maxFragmentSize,
length - (fragmentId *
maxFragmentSize));
@@ -252,17 +478,9 @@
}
// Send the fragment chain.
- normalStream.send(firstFragment);
- }
-
- // Socket that we will be sending admin data over.
- DatagramSocket adminSocket;
- UDPNodeId nodeId = new UDPNodeId();
- // timmer Used to slow down the transmision of messages.
- volatile long stopSendsTill=0;
-
- // This is set when the transport is set in the cluster.
- public void slowDownSendRate() {
- stopSendsTill = System.currentTimeMillis()+500;
+ if( dest == null )
+ send(firstFragment);
+ else
+ send( (UDPNodeId)dest, firstFragment);
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/udp/UDPAdminStream.java
Index: UDPAdminStream.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.udp;
import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashMap;
import org.jbossmq.Log;
import java.util.Vector;
/**
* The UDPAdminStream class blocks on a UDP socket
* waiting for admin packets.
*
* Admin packet must fit into a single fragment. No
* packet recovery is needed.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
class UDPAdminStream implements Runnable {
// The channel manager owning this Inbound channel
UDPTransport transport;
// The thread that is reciving messages.
private Thread runningThread;
// Used to stop the thread
private boolean done = false;
// We incrementaly number the datagrams we send
private int nextDatagramId = 0;
// The socket we will be listening on
DatagramSocket socket;
/**
* Starts the input thread
*/
synchronized void start() {
if (runningThread != null)
return;
done = false;
runningThread = new Thread(this, "UDPAdminStream");
runningThread.start();
}
/**
* Stops the input thread
*/
synchronized void stop() throws InterruptedException {
if (runningThread == null)
return;
done = true;
runningThread.interrupt();
runningThread.join();
runningThread = null;
}
/**
* The thread of the InboundStream
*/
public void run() {
try {
byte buffer[] = new byte[transport.maxFragmentSize +
Datagram.HEADER_SIZE];
DatagramPacket packet = new java.net.DatagramPacket(buffer,
buffer.length);
// We iterate at least every 2 seconds so that we can
// stop the thread
socket.setSoTimeout(2000);
while (!done) {
try {
// Read in a message from the network
packet.setData(buffer);
packet.setLength(buffer.length);
socket.receive(packet);
// Is the packet ok?
if (packet.getLength() < Datagram.HEADER_SIZE)
{
Log.notice("["+this+"] Packet was too
small, dropping.");
continue;
}
// Build the Datagram object form the network
data
Datagram dg = new Datagram(packet.getData(),
packet.getLength());
dg.getSenderId().address = packet.getAddress();
// Drop the packet if this node sent this
packet
if (transport.nodeId.equals(dg.getSenderId()))
{
continue;
}
// Used to test the packet retransmision
faclities
// of the UDPTransport class. We drop 1 in 10
packets.
java.util.Random r = new java.util.Random();
if (r.nextInt(100) < 10) {
Log.error("["+this+"] TEST DROP: " +
dg.getId());
continue;
}
// Continue processing the datagram
fragmentArrived(dg);
} catch (java.io.InterruptedIOException e) {
} catch (InterruptedException e) {
}
}
} catch (java.io.IOException e) {
e.printStackTrace();
}
}
/**
* This places the datagram in the proper position in the
* message "Stream"
*/
synchronized private void fragmentArrived(Datagram dg) throws
InterruptedException {
transport.datagramArrived(dg);
}
/**
* Returns the next message id in the output stream.
*/
protected int getNextDatagramId() {
// I think ++ is atomic operation in Java.. (No synch needed)
return nextDatagramId++;
}
public String toString() {
return "org.jbossmq.cluster.udp.UDPAdminStream";
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/udp/UDPStream.java
Index: UDPStream.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.udp;
import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashMap;
import org.jbossmq.Log;
import java.util.Vector;
/**
* The UDPStream class blocks on a UDP socket
* waiting for input. Since a message is made up
* of multiple fragments and UDP does not garantee message
* delivery, we keep track of the "Stream" of message
* being sent by all the nodes in the cluster.
*
* The "Stream" is a linked list of MessageState objects.
* The head of the list is the oldest message that has not been completed
* yet. The tail of the list is the most recently received message from
* the node. Messages between the tail and head end could be completely
* received or in progess. Message marked as "not required to be in order"
* are sent to the cluster to be processed by the listners as soon as they are
* completed, and removed form the "Stream". When the head message is completed
* it is sent up to the cluster to be processed and it is removed from the "stream"
* When the head message is != to the tail message, this means that a datagram
* was dropped and the head message needs to be resent, so a resend request is
issued.
* When a new message arrives it is placed at the end of the message stream but
* filler message will be added to the stream to fill the space left by any dropped
* messages.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
class UDPStream implements Runnable {
// The maximum size the datagram input stream can grow to.
private static final int MAX_DATAGRAM_STREAM_SIZE = 100;
// The channel manager owning this Inbound channel
UDPTransport transport;
// The thread that is reciving messages.
private Thread runningThread;
// Used to stop the thread
private boolean done = false;
// Up to how many sent packets do we cache?
private int maxSentCacheSize = 200;
// The name this stream. Used to ease debugging.
public java.lang.String name;
// We incrementaly number the datagrams we send
private int nextDatagramId = 0;
// Used to for error recovery.
private LinkedList sentCache = new LinkedList();
// The socket we will be listening on
DatagramSocket socket;
// Maps UDPNodeIds to NodeStates
private HashMap nodes = new HashMap(10);
// Keeps track of the progress of message reception
private static class MessageState {
MessageState(int id) {
this.id = id;
}
int id;
boolean arrived = false;
boolean isDropable = false;
long lastResendRequest = 0;
Datagram[] fragments;
}
// Keeps track of the state of a remote nodes in the cluster
private static class NodeState {
UDPNodeId nodeId;
LinkedList datagramStream = new LinkedList();
int lastDatagramId = 0;
boolean pastFirstPacket = false;
}
/**
* Gets the MessageState object for the given Node/Message Id
* This will create new MessageState objects as needed.
*/
private MessageState getMessageState(NodeState nodeState, int id) {
boolean rollOver = id < -5000 && nodeState.lastDatagramId > 5000;
if (!nodeState.pastFirstPacket) {
MessageState t = new MessageState(id);
nodeState.datagramStream.addLast(t);
nodeState.lastDatagramId = id;
nodeState.pastFirstPacket = true;
return t;
} else if (nodeState.lastDatagramId == id) {
// This is weird. A node might have retransmitted a packet
for a
// message that was succesfully recived the first time
(therefore
// we are not pending messages for it in the datagramStream)
if( nodeState.datagramStream.size()==0 )
return null;
// The packet id is the same as the last packet.
return (MessageState) nodeState.datagramStream.getLast();
} else if (nodeState.lastDatagramId < id || rollOver) {
// The packet id is newer than the last packet
int i = nodeState.lastDatagramId;
while (i != id) {
i++;
// Packets got dropped but we record them in the
stream anyways.
MessageState t = new MessageState(i);
nodeState.datagramStream.addLast(t);
if (nodeState.datagramStream.size() >
MAX_DATAGRAM_STREAM_SIZE) {
handleNodeUnresponsive(nodeState);
nodeState.datagramStream.removeFirst();
}
}
nodeState.lastDatagramId = id;
return (MessageState) nodeState.datagramStream.getLast();
} else {
// The packet id older than the last packet. We have to
search for it
// int the datagram stream.
Iterator i = nodeState.datagramStream.iterator();
while (i.hasNext()) {
MessageState ds = (MessageState) i.next();
if (ds.id == id)
return ds;
}
// This is weird. A node might have retransmitted a packet
for a
// message that was succesfully recived the first time
(therefore
// we are not pending messages for it in the datagramStream)
return null;
}
}
// We should do something else to inform the cluster that
// a node is being unresponsive.
// This happens when the message stream gets too long.
// TODO: make it also happen when a message sits too long on
// the stream.
private void handleNodeUnresponsive(NodeState nodeState) {
Log.error("NODE IS UNRESPONSIVE!!!");
}
/**
* Starts the input thread
*/
synchronized void start() {
if (runningThread != null)
return;
done = false;
runningThread = new Thread(this, "UDPStream-"+name);
runningThread.start();
}
/**
* Stops the input thread
*/
synchronized void stop() throws InterruptedException {
if (runningThread == null)
return;
done = true;
runningThread.interrupt();
runningThread.join();
runningThread = null;
}
/**
* The thread of the InboundStream
*/
public void run() {
try {
byte buffer[] = new byte[transport.maxFragmentSize +
Datagram.HEADER_SIZE];
DatagramPacket packet = new java.net.DatagramPacket(buffer,
buffer.length);
// We iterate at least every 500 miliseconds so that we can
// dispatch resend request messages frequently
socket.setSoTimeout(500);
while (!done) {
try {
// We might be doing this too often..
synchronized( this ) {
Iterator i = nodes.values().iterator();
while( i.hasNext() ) {
NodeState nodeState =
(NodeState)i.next();
dispatchDatagrams(nodeState);
}
}
// Read in a message from the network
packet.setData(buffer);
packet.setLength(buffer.length);
socket.receive(packet);
// Is the packet ok?
if (packet.getLength() < Datagram.HEADER_SIZE)
{
Log.notice("["+this+"] Packet was too
small, dropping.");
continue;
}
// Build the Datagram object form the network
data
Datagram dg = new Datagram(packet.getData(),
packet.getLength());
dg.getSenderId().address = packet.getAddress();
// Drop the packet if this node sent this
packet
if (transport.nodeId.equals(dg.getSenderId()))
{
continue;
}
// Used to test the packet retransmision
faclities
// of the UDPTransport class. We drop 1 in 10
packets.
java.util.Random r = new java.util.Random();
if (r.nextInt(100) < 10) {
Log.error("["+this+"] TEST DROP: " +
dg.getId());
continue;
}
// Continue processing the datagram
fragmentArrived(dg);
} catch (java.io.InterruptedIOException e) {
} catch (InterruptedException e) {
}
}
} catch (java.io.IOException e) {
e.printStackTrace();
}
}
/**
* Adds a packet to the sent packet cache
* Removes the oldest packet if the cache is full.
*/
void addToSentCache(Datagram dg) {
if( dg.isMessageFlagsSet(Datagram.DROPPABLE_FLAG) )
return;
synchronized (sentCache) {
sentCache.addFirst(dg);
if (sentCache.size() > maxSentCacheSize)
sentCache.removeLast();
}
}
/**
* This method does the bulk of the "Stream" managment.
* - it delivers messages that are complete.
* - drops messages that are incomplete and droppable.
* - Requests retransmision of the incomplete packet at the
* head of the stream.
*/
private void dispatchDatagrams(NodeState nodeState) throws
InterruptedException {
if (nodeState.datagramStream.size() == 0)
return;
boolean atFront = true;
Object lastDS = nodeState.datagramStream.getLast();
Iterator i = nodeState.datagramStream.iterator();
MessageState firstMissing = null;
Vector missingList = new Vector();
while (i.hasNext()) {
MessageState ds = (MessageState) i.next();
if (ds.arrived) {
if (atFront ||
!(ds.fragments[0].isMessageFlagsSet(Datagram.KEEP_ORDER_FLAG)) ) {
transport.datagramArrived(ds.fragments[0]);
i.remove();
}
continue;
} else {
atFront = false;
if (ds == lastDS) {
continue;
} else if (ds.isDropable) {
i.remove();
} else {
if (firstMissing == null) {
firstMissing = ds;
}
missingList.addElement(new Integer(ds.id));
}
}
}
if ( firstMissing != null && System.currentTimeMillis() -
firstMissing.lastResendRequest > 200 ) {
transport.requestResend(nodeState.nodeId,
missingList,this==transport.broadcastStream);
firstMissing.lastResendRequest = System.currentTimeMillis();
}
}
/**
* This places the datagram in the proper position in the
* message "Stream"
*/
synchronized private void fragmentArrived(Datagram dg) throws
InterruptedException {
UDPNodeId senderId = dg.getSenderId();
int id = dg.getId();
short fragmentId = dg.getFragmentId();
Log.log("["+this+"] Processing datagram: " + id);
NodeState nodeState = getNodeState(senderId);
MessageState ds = getMessageState(nodeState, id);
// We might have allready received this message.
if (ds == null || ds.arrived)
return;
if (dg.getFragmentCount(transport.maxFragmentSize) == 1) {
// The entire message was contained within this datagram
ds.arrived = true;
ds.fragments = new Datagram[1];
ds.fragments[0] = dg;
} else {
// The message is fragmented.
if (ds.fragments == null) {
ds.fragments = new
Datagram[dg.getFragmentCount(transport.maxFragmentSize)];
ds.isDropable =
dg.isMessageFlagsSet(Datagram.DROPPABLE_FLAG);
}
ds.fragments[fragmentId] = dg;
// Check to see if we have recived all the fragments?
boolean messageLoaded = true;
// iterating backwards will break us out of the loop
// quicker in the common case
for (int i = ds.fragments.length - 1; i >= 0; i--) {
if (ds.fragments[i] == null) {
messageLoaded = false;
break;
}
}
if (messageLoaded) {
// chain the fragments together
for (int i = 0; i < ds.fragments.length - 1; i++)
ds.fragments[i].nextFragment = ds.fragments[i
+ 1];
ds.arrived = true;
}
}
}
/**
* Looks for a packet in the sent cache.
* returns null if the packet could not be found.
*/
public Datagram getFromSentCache(int messageId) {
LinkedList t;
synchronized (sentCache) {
t = (LinkedList) sentCache.clone();
}
java.util.Iterator i = t.iterator();
while (i.hasNext()) {
Datagram dg = (Datagram) i.next();
if (dg.getId() == messageId) {
return dg;
}
}
return null;
}
/**
* Returns the next message id in the output stream.
*/
protected int getNextDatagramId() {
// I think ++ is atomic operation in Java.. (No synch needed)
return nextDatagramId++;
}
/**
* Gets the NodeState for the given node id.
* if none exists, a new NodeState object is created.
*/
private NodeState getNodeState(UDPNodeId nodeId) {
NodeState rc;
rc = (NodeState)nodes.get( nodeId );
if (rc==null) {
rc = new NodeState();
rc.nodeId = nodeId;
nodes.put(nodeId, rc);
}
return (NodeState)rc;
}
/**
* This is used by the AdminPacketHandler. Called when
* a packet cannot be retransmited by the host (It was droppable or
* packet was not in the packet cache).
*
* We mark the packet as droppable so it gets removed from the stream
* the next time the messages are dispatch on the stream.
*/
synchronized void removeFromDatagramStream(UDPNodeId nodeId, int datagramId) {
NodeState nodeState = getNodeState(nodeId);
Iterator i = nodeState.datagramStream.iterator();
while (i.hasNext()) {
MessageState ds = (MessageState) i.next();
if (ds.id == datagramId)
ds.isDropable = true;
}
}
/**
* This is a descriptive name for the stream. Used of debuging messages
*/
public void setName(java.lang.String newName) {
name = newName;
}
public String toString() {
return "org.jbossmq.cluster.udp.UDPStream:"+name;
}
}