User: hiram
Date: 01/02/21 22:12:30
Modified: src/main/org/jbossmq/cluster/udp Datagram.java
UDPTransport.java
Added: src/main/org/jbossmq/cluster/udp AdminStream.java
NormalStream.java UDPNodeId.java
Removed: src/main/org/jbossmq/cluster/udp AdminPacketHandler.java
InboundStream.java
Log:
Improved easy of use by removing the requirment of having to explicitly set the
nodeId of a cluster.
We now uses normal UDP packets for resend requests (In other words resend requests
are not broadcasted).
Message transmition rates are slowed down when a node starts receiving resend
requests.
Revision Changes Path
1.2 +46 -34 jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java
Index: Datagram.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Datagram.java 2001/02/15 03:33:07 1.1
+++ Datagram.java 2001/02/22 06:12:29 1.2
@@ -17,7 +17,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
class Datagram {
@@ -29,7 +29,7 @@
final static int ID_OFFSET = 0; // int
final static int LENGTH_OFFSET = 4; // int
final static int FRAGMENT_ID_OFFSET = 8; // short
- final static int SENDER_ID_OFFSET = 10; // short
+ final static int SENDER_ID_OFFSET = 10; // 2 bytes
final static int TOPIC_ID_OFFSET = 12; // short
final static byte MESSAGE_FLAGS_OFFSET = 14; // byte
final static int HEADER_SIZE = 15;
@@ -50,33 +50,7 @@
}
}
- /**
- * This is used to build a new datagram
- */
- Datagram(
- int id,
- int length,
- short fragmentId,
- short senderId,
- short topic,
- boolean droppable,
- boolean keepOrder,
- byte payload[],
- int offset,
- int len) {
-
- data = new byte[len + HEADER_SIZE];
- SerializerUtil.writeIntTo(id, data, ID_OFFSET);
- SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
- SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
- SerializerUtil.writeShortTo(senderId, data, SENDER_ID_OFFSET);
- SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
- data[MESSAGE_FLAGS_OFFSET] =
- (byte) (((droppable ? 1 : 0) << DROPPABLE_MESSAGE_FLAG) |
((keepOrder ? 1 : 0) << KEEP_ORDER_MESSAGE_FLAG));
- for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
- data[i] = payload[j];
- }
/**
* Gets the original payload of the datagram.
@@ -121,13 +95,7 @@
return SerializerUtil.readIntFrom(data, LENGTH_OFFSET);
}
- /**
- * The id of the sender of this message.
- * @return int
- */
- short getSenderId() {
- return SerializerUtil.readShortFrom(data, SENDER_ID_OFFSET);
- }
+
/**
* The topic under which this message was sent.
@@ -143,5 +111,49 @@
boolean isKeepOrder() {
return ((data[MESSAGE_FLAGS_OFFSET] >> KEEP_ORDER_MESSAGE_FLAG) &
0x01) != 0;
+ }
+
+ UDPNodeId senderId;
+
+ /**
+ * This is used to build a new datagram
+ */
+ Datagram(
+ int id,
+ int length,
+ short fragmentId,
+ UDPNodeId senderId,
+ short topic,
+ boolean droppable,
+ boolean keepOrder,
+ byte payload[],
+ int offset,
+ int len) {
+
+ data = new byte[len + HEADER_SIZE];
+ SerializerUtil.writeIntTo(id, data, ID_OFFSET);
+ SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
+ SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
+ senderId.writeToByteArray(data, SENDER_ID_OFFSET);
+ SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
+ data[MESSAGE_FLAGS_OFFSET] =
+ (byte) (((droppable ? 1 : 0) << DROPPABLE_MESSAGE_FLAG) |
((keepOrder ? 1 : 0) << KEEP_ORDER_MESSAGE_FLAG));
+
+ for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
+ data[i] = payload[j];
+ }
+
+ /**
+ * The id of the sender of this message.
+ * @return int
+ */
+ UDPNodeId getSenderId() {
+ if( senderId != null )
+ return senderId;
+
+ UDPNodeId t = new UDPNodeId();
+ t.readFromByteArray(data, SENDER_ID_OFFSET);
+ senderId = t;
+ return t;
}
}
1.2 +57 -50 jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java
Index: UDPTransport.java
===================================================================
RCS file:
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- UDPTransport.java 2001/02/15 03:33:07 1.1
+++ UDPTransport.java 2001/02/22 06:12:29 1.2
@@ -28,33 +28,34 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class UDPTransport implements org.jbossmq.cluster.Transport {
// The class responsible for pulling messages from the network
- InboundStream inBoundStream = new InboundStream();
+ NormalStream normalStream = new NormalStream();
// The interface into the cluster we report message arivals to
TransportListener transportListener;
// We incrementaly number the datagrams we send
int nextDatagramId = 0;
- AdminPacketHandler adminPacketHandler = new AdminPacketHandler();
+ AdminStream adminStream = new AdminStream();
org.jbossmq.cluster.Cluster cluster;
// Adderes datagrams will be sent to
- InetAddress datagramAddDestination;
+
+ InetAddress clusterAddress;
// Port datagrams will be sent to
- int datagramPortDestination;
+ int clusterPort;
// A message can be composed of multiple datagram fragments.
int maxFragmentSize = 1024;
// Socket that we will be sending data over.
- DatagramSocket socket;
+ DatagramSocket normalSocket;
boolean started = false;
// I like simple constructors.
public UDPTransport() {
- inBoundStream.transport = this;
- adminPacketHandler.transport = this;
+ normalStream.transport = this;
+ adminStream.transport = this;
}
@@ -92,7 +93,6 @@
// This is set when the transport is set in the cluster.
public void setCluster(org.jbossmq.cluster.Cluster newCluster) {
cluster = newCluster;
- cluster.addTopicListener(cluster.MANAGMENT_TOPIC, adminPacketHandler);
}
// This is set when the transport is set in the cluster.
@@ -116,8 +116,9 @@
String transportMode = (String) t.get("TransportMode");
String groupString = (String) t.get("Group");
- String useInterfaceString = (String) t.get("UseInterface");
String portString = (String) t.get("Port");
+ String nodeInterfaceString = (String) t.get("NodeInterface");
+ String nodePortString = (String) t.get("NodePort");
if (transportMode == null) {
throw new
org.jbossmq.cluster.InvalidConfigurationException("TransportMode property not set");
@@ -133,40 +134,36 @@
InetAddress group = InetAddress.getByName(groupString);
int port = Integer.parseInt(portString);
+ DatagramSocket a;
MulticastSocket s = new MulticastSocket(port);
s.joinGroup(group);
+
+ if (nodeInterfaceString != null) {
+ nodeId.address =
InetAddress.getByName(nodeInterfaceString);
+ } else {
+ nodeId.address = InetAddress.getLocalHost();
+ }
+ s.setInterface(nodeId.address);
- if (useInterfaceString != null) {
-
s.setInterface(InetAddress.getByName(useInterfaceString));
+ if (nodePortString != null) {
+ nodeId.port = Integer.parseInt(nodePortString);
+ a = new DatagramSocket(nodeId.port);
+ } else {
+ a = new DatagramSocket();
+ nodeId.port = a.getLocalPort();
}
- datagramPortDestination = port;
- datagramAddDestination = group;
- socket = s;
+ System.out.println("Local NodeId: "+nodeId);
+
+ clusterPort = port;
+ clusterAddress = group;
+ normalSocket = s;
+ adminSocket = a;
} catch (IOException e) {
throw new
org.jbossmq.cluster.InvalidConfigurationException(
"Ip group of the Group property was invalid: "
+ e.getMessage());
}
- } else if (transportMode.equals("Broadcast")) {
- if (portString == null) {
- throw new
org.jbossmq.cluster.InvalidConfigurationException("Port property not set");
- }
-
- try {
-
- int port = Integer.parseInt(portString);
- DatagramSocket s = new DatagramSocket(port);
- InetAddress group =
InetAddress.getByName("255.255.255.255");
-
- datagramPortDestination = port;
- datagramAddDestination = group;
- socket = s;
-
- } catch (IOException e) {
- throw new
org.jbossmq.cluster.InvalidConfigurationException("Port property was invalid: " +
e.getMessage());
- }
-
} else {
throw new
org.jbossmq.cluster.InvalidConfigurationException("TransportMode property invalid");
}
@@ -174,30 +171,22 @@
// Starts the transport
synchronized public void start() throws
org.jbossmq.cluster.InvalidStateException {
- if (socket == null)
+ if (normalSocket == null)
throw new org.jbossmq.cluster.InvalidStateException("The
transport properties have not been set yet.");
started = true;
- inBoundStream.start();
+ normalStream.start();
+ adminStream.start();
}
// Stops the transport
synchronized public void stop() throws InterruptedException {
- inBoundStream.stop();
+ normalStream.stop();
+ adminStream.stop();
started = false;
}
- // Sends a datagram fragment chain over the network
- void send(Datagram dg) throws InterruptedException, java.io.IOException {
- // Send the fragment chain.
- adminPacketHandler.addToSentCache(dg);
- while (dg != null) {
- DatagramPacket packet =
- new java.net.DatagramPacket(dg.data, dg.data.length,
datagramAddDestination, datagramPortDestination);
- socket.send(packet);
- dg = dg.nextFragment;
- }
- }
+
// Builds a datagram fragment chain with the given data
// and then sends it over the network
@@ -210,6 +199,13 @@
Datagram firstFragment = null;
Datagram lastFragment = null;
+ // Check to see if the transmision rate has been slowed down.
+ long waitLeft = stopSendsTill - System.currentTimeMillis();
+ while( waitLeft > 0) {
+ Thread.sleep(waitLeft);
+ waitLeft = stopSendsTill - System.currentTimeMillis();
+ }
+
// while the whole message has not been proccesed
while (fragmentId * maxFragmentSize < length) {
@@ -222,7 +218,7 @@
id,
length,
fragmentId,
- cluster.getNodeId(),
+ nodeId,
channelId,
droppable,
keepOrder,
@@ -236,7 +232,7 @@
id,
length,
fragmentId,
- cluster.getNodeId(),
+ nodeId,
channelId,
droppable,
keepOrder,
@@ -256,6 +252,17 @@
}
// Send the fragment chain.
- send(firstFragment);
+ normalStream.send(firstFragment);
+ }
+
+ // Socket that we will be sending admin data over.
+ DatagramSocket adminSocket;
+ UDPNodeId nodeId = new UDPNodeId();
+ // timmer Used to slow down the transmision of messages.
+ volatile long stopSendsTill=0;
+
+ // This is set when the transport is set in the cluster.
+ public void slowDownSendRate() {
+ stopSendsTill = System.currentTimeMillis()+500;
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/udp/AdminStream.java
Index: AdminStream.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.udp;
import java.util.LinkedList;
import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.io.ObjectOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Externalizable;
import java.io.ObjectInputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
/**
* This class manages the Admin Message Stream. The AdminStream is used
* to request packet retransmisions. When packets get sent, they are placed
* on a packet cache. Other nodes receiving packets may send a request
* on the AdminStream for this node to retransmit a packet. If the packet
* is in the cache, it is retransmited over the NormalStream. If not, a
* reply packet is sent on the AdminStream reporting that the message was
* not in the cache.
*
* This class works closely with the NormalStream class to implement
* the error recovery stated above.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
class AdminStream implements Runnable {
// The transport owning this Inbound channel
UDPTransport transport;
// Used to for error recovery.
LinkedList sentCache = new LinkedList();
// Up to how many sent packets do we cache?
int maxSentCacheSize = 200;
//
// This is the message sent to request a packet be resent
//
static class ResendAdminDatagram implements java.io.Serializable {
ResendAdminDatagram(int j) {
datagramId = j;
}
int datagramId;
}
//
// This is the message sent when the requested packet is not in the cache.
//
static class ResendErrorAdminDatagram implements java.io.Serializable {
ResendErrorAdminDatagram(int j) {
datagramId = j;
}
int datagramId;
}
// Adds a packet to the cache
void addToSentCache(Datagram dg) {
synchronized (sentCache) {
sentCache.addFirst(dg);
if (sentCache.size() > maxSentCacheSize)
sentCache.removeLast();
}
}
boolean done;
// The thread that is reciving messages.
Thread runningThread;
// This send is used to send the administrative messages.
public void adminSend(UDPNodeId nodeId, Object message) throws IOException,
InterruptedException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bos);
os.writeObject(message);
os.close();
byte data[] = bos.toByteArray();
Datagram dg = new
Datagram(0,data.length,(short)0,transport.nodeId,(short)0,true,false,data,0,data.length);
DatagramPacket packet = new java.net.DatagramPacket(dg.data,
dg.data.length, nodeId.address, nodeId.port);
transport.adminSocket.send(packet);
}
/**
* This method is called when a message is recived over the network
* on the managment topic.
*/
private void processDatagram(Datagram dg) {
try {
Object o;
ByteArrayInputStream bais = new
ByteArrayInputStream(dg.getData());
ObjectInputStream is = new ObjectInputStream(bais);
o = is.readObject();
is.close();
if (o instanceof ResendAdminDatagram) {
ResendAdminDatagram radg = (ResendAdminDatagram)o;
System.out.println("[Cluster] RESEND REQUEST ARRIVED:
" + radg.datagramId);
resendDatagram(dg.getSenderId(), radg.datagramId);
} else if (o instanceof ResendErrorAdminDatagram) {
ResendErrorAdminDatagram readg =
(ResendErrorAdminDatagram)o;
System.out.println("[Cluster] RESEND REQUEST FAILED: "
+ readg.datagramId);
transport.normalStream.removeFromDatagramStream(dg.getSenderId(), readg.datagramId);
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Sends a resend request
void requestResend(UDPNodeId nodeId, int messageId) {
System.out.println("[Cluster] SENDING RESEND REQUEST: " + messageId);
ResendAdminDatagram m = new ResendAdminDatagram(messageId);
try {
adminSend(nodeId, m);
} catch (Exception e) {
e.printStackTrace();
}
}
// Resends the message if it was found in the cache
void resendDatagram(UDPNodeId requestor, int messageId) throws
InterruptedException, java.io.IOException {
LinkedList t;
synchronized (sentCache) {
t = (LinkedList) sentCache.clone();
}
java.util.Iterator i = t.iterator();
while (i.hasNext()) {
Datagram dg = (Datagram) i.next();
if (dg.getId() == messageId) {
System.out.println("[Cluster] RESEND REQUEST SERVICED:
" + messageId);
System.out.println("Resending : " + messageId);
transport.slowDownSendRate();
transport.normalStream.send(dg);
return;
}
}
System.out.println("[Cluster] RESEND REQUEST NO SERVICED (not in
cache): " + messageId);
// The datagram had been removed from the cache...
ResendErrorAdminDatagram error = new
ResendErrorAdminDatagram(messageId);
adminSend(requestor, error);
}
/**
*
*/
public void run() {
try {
byte buffer[] = new byte[transport.maxFragmentSize +
Datagram.HEADER_SIZE];
DatagramPacket packet = new java.net.DatagramPacket(buffer,
buffer.length);
// We iterate at least every 2 seconds so that we can quit the
loop
transport.adminSocket.setSoTimeout(2000);
while (!done) {
try {
// Read in a message from the network
packet.setData(buffer);
packet.setLength(buffer.length);
transport.adminSocket.receive(packet);
// Is the packet ok?
if (packet.getLength() < Datagram.HEADER_SIZE)
{
System.out.println("Packet was too
small, dropping.");
continue;
}
// Build the Datagram object form the network
data
Datagram dg = new Datagram(packet.getData(),
packet.getLength());
dg.getSenderId().address = packet.getAddress();
// Drop the packet if this node sent this
packet
if (transport.nodeId.equals(dg.getSenderId()))
{
continue;
}
// Continue processing the datagram
processDatagram(dg);
} catch (java.io.InterruptedIOException e) {
}
}
} catch (java.io.IOException e) {
e.printStackTrace();
}
}
// Stops the input thread
synchronized void start() {
if (runningThread != null)
return;
done = false;
runningThread = new Thread(this, "UDPTransport AdminStream");
runningThread.setPriority(Thread.MAX_PRIORITY);
runningThread.start();
}
// Starts the input thread
synchronized void stop() throws InterruptedException {
if (runningThread == null)
return;
done = true;
runningThread.interrupt();
runningThread.join();
runningThread = null;
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/udp/NormalStream.java
Index: NormalStream.java
===================================================================
/*
* JBossMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jbossmq.cluster.udp;
import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashMap;
/**
* The InboundStream class blocks on the socket
* waiting for input. Since a message is make up
* of multiple fragments and UDP does not garantee message
* delivery, we keep track of the "Stream" of message
* being sent by all the nodes in the cluster.
*
* The "Stream" is a linked list of MessageState objects.
* The head of the list is the oldest message that has not been completed
* yet. The tail of the list is the most recently received message from
* the node. Messages between the tail and head end could be completely
* received or in progess. Message marked as "not required to be in order"
* are sent to the cluster to be processed by the listners as soon as they are
* completed, and removed form the "Stream". When the head message is completed
* it is sent up to the cluster to be processed and it is removed from the "stream"
* When the head message is != to the tail message, this means that a datagram
* was dropped and the head message needs to be resent, so a resend request is
issued.
* When a new message arrives it is placed at the end of the message stream but
* filler message will be added to the stream to fill the space left by any dropped
* messages.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
class NormalStream implements Runnable {
// The channel manager owning this Inbound channel
UDPTransport transport;
// The thread that is reciving messages.
private Thread runningThread;
// Used to stop the thread
private boolean done = false;
// Maps UDPNodeIds to NodeStates
private HashMap nodes = new HashMap(10);
// Keeps track of the progress of message reception
private static class MessageState {
MessageState(int id) {
this.id = id;
}
int id;
boolean arrived = false;
boolean isDropable = false;
long lastResendRequest = 0;
Datagram[] fragments;
}
// Keeps track of the state of a remote nodes in the cluster
private static class NodeState {
UDPNodeId nodeId;
LinkedList datagramStream = new LinkedList();
int lastDatagramId = 0;
boolean pastFirstPacket = false;
}
// The maximum size the datagram stream can grow to.
static final int MAX_DATAGRAM_STREAM_SIZE = 100;
private MessageState getMessageState(NodeState nodeState, int id) {
boolean rollOver = id < -5000 && nodeState.lastDatagramId > 5000;
if (!nodeState.pastFirstPacket) {
MessageState t = new MessageState(id);
nodeState.datagramStream.addLast(t);
nodeState.lastDatagramId = id;
nodeState.pastFirstPacket = true;
return t;
} else if (nodeState.lastDatagramId == id) {
// The packet id is the same as the last packet.
return (MessageState) nodeState.datagramStream.getLast();
} else if (nodeState.lastDatagramId < id || rollOver) {
// The packet id is newer than the last packet
int i = nodeState.lastDatagramId;
while (i != id) {
i++;
// Packets got dropped but we record them in the
stream anyways.
MessageState t = new MessageState(i);
nodeState.datagramStream.addLast(t);
if (nodeState.datagramStream.size() >
MAX_DATAGRAM_STREAM_SIZE) {
handleNodeUnresponsive(nodeState);
return null;
}
}
nodeState.lastDatagramId = id;
return (MessageState) nodeState.datagramStream.getLast();
} else {
// The packet id older than the last packet. We have to
search for it
// int the datagram stream.
Iterator i = nodeState.datagramStream.iterator();
while (i.hasNext()) {
MessageState ds = (MessageState) i.next();
if (ds.id == id)
return ds;
}
// This is weird. A node might have retransmitted a packet
for a
// message that was succesfully recived the first time
(therefore
// we are not pending messages for it in the datagramStream)
return null;
}
}
// We should do something else to inform the cluster that
// a node is being unresponsive. This could be due to node
// failure or network failure.
// This happens when the message stream gets too long.
// TODO: make it also happen when a message sits too long on
// the stream.
private void handleNodeUnresponsive(NodeState nodeState) {
System.out.println("NODE IS UNRESPONSIVE!!!");
}
// Stops the input thread
synchronized void start() {
if (runningThread != null)
return;
done = false;
runningThread = new Thread(this, "UDPTransport NormalStream");
runningThread.start();
}
// Starts the input thread
synchronized void stop() throws InterruptedException {
if (runningThread == null)
return;
done = true;
runningThread.interrupt();
runningThread.join();
runningThread = null;
}
/**
* The thread of the InboundStream
*/
public void run() {
try {
byte buffer[] = new byte[transport.maxFragmentSize +
Datagram.HEADER_SIZE];
DatagramPacket packet = new java.net.DatagramPacket(buffer,
buffer.length);
// We iterate at least every 500 miliseconds so that we can
// dispatch resend request messages frequently
transport.normalSocket.setSoTimeout(500);
while (!done) {
try {
// We might be doing this too often..
synchronized( this ) {
Iterator i = nodes.values().iterator();
while( i.hasNext() ) {
NodeState nodeState =
(NodeState)i.next();
dispatchMessages(nodeState);
}
}
// Read in a message from the network
packet.setData(buffer);
packet.setLength(buffer.length);
transport.normalSocket.receive(packet);
// Is the packet ok?
if (packet.getLength() < Datagram.HEADER_SIZE)
{
System.out.println("Packet was too
small, dropping.");
continue;
}
// Build the Datagram object form the network
data
Datagram dg = new Datagram(packet.getData(),
packet.getLength());
dg.getSenderId().address = packet.getAddress();
// Drop the packet if this node sent this
packet
if (transport.nodeId.equals(dg.getSenderId()))
{
continue;
}
/*
// Used to test the packet retransmision
faclities
// of the UDPTransport class. We drop 1 in 10
packets.
java.util.Random r = new java.util.Random();
if (r.nextInt(100) < 10) {
System.out.println("Test packet DROP:
" + dg.getId());
continue;
}
*/
// Continue processing the datagram
processDatagram(dg);
} catch (java.io.InterruptedIOException e) {
} catch (InterruptedException e) {
}
}
} catch (java.io.IOException e) {
e.printStackTrace();
}
}
/**
* This places the datagram in the proper position in the
* message "Stream"
*/
synchronized private void processDatagram(Datagram dg) throws
InterruptedException {
UDPNodeId senderId = dg.getSenderId();
int id = dg.getId();
short fragmentId = dg.getFragmentId();
NodeState nodeState = getNodeState(senderId);
MessageState ds = getMessageState(nodeState, id);
System.out.println("Processing datagram: " + id);
// We might have allready received this message.
if (ds == null || ds.arrived)
return;
if (dg.getFragmentCount(transport.maxFragmentSize) == 1) {
// The entire message was contained within this datagram
ds.arrived = true;
ds.fragments = new Datagram[1];
ds.fragments[0] = dg;
} else {
// The message is fragmented.
if (ds.fragments == null) {
ds.fragments = new
Datagram[dg.getFragmentCount(transport.maxFragmentSize)];
ds.isDropable = dg.isDroppable();
}
ds.fragments[fragmentId] = dg;
// Check to see if we have recived all the fragments?
boolean messageLoaded = true;
// iterating backwards will break us out of the loop
// quicker in the common case
for (int i = ds.fragments.length - 1; i >= 0; i--) {
if (ds.fragments[i] == null) {
messageLoaded = false;
break;
}
}
if (messageLoaded) {
// chain the fragments together
for (int i = 0; i < ds.fragments.length - 1; i++)
ds.fragments[i].nextFragment = ds.fragments[i
+ 1];
ds.arrived = true;
}
}
}
/**
* This method does the bulk of the "Stream" managment.
* - it delivers messages that are complete.
* - drops messages that are incomplete and droppable.
* - Requests retransmision of the incomplete packet at the
* head of the stream.
*/
private void dispatchMessages(NodeState nodeState) throws InterruptedException
{
if (nodeState.datagramStream.size() == 0)
return;
boolean atFront = true;
Object lastDS = nodeState.datagramStream.getLast();
Iterator i = nodeState.datagramStream.iterator();
MessageState firstMissing = null;
while (i.hasNext()) {
MessageState ds = (MessageState) i.next();
if (ds.arrived) {
if (atFront || !ds.fragments[0].isKeepOrder()) {
transport.messageArrivedEvent(ds.fragments[0]);
i.remove();
}
continue;
} else {
atFront = false;
if (ds == lastDS) {
continue;
} else if (ds.isDropable) {
i.remove();
} else {
if (firstMissing == null) {
firstMissing = ds;
}
}
}
}
if (firstMissing != null && System.currentTimeMillis() -
firstMissing.lastResendRequest > 100) {
transport.adminStream.requestResend(nodeState.nodeId,
firstMissing.id);
firstMissing.lastResendRequest = System.currentTimeMillis();
}
}
/**
* Gets the NodeState for the given node id.
* if none exists, a new NodeState object is created.
*/
private NodeState getNodeState(UDPNodeId nodeId) {
NodeState rc;
rc = (NodeState)nodes.get( nodeId );
if (rc==null) {
rc = new NodeState();
rc.nodeId = nodeId;
nodes.put(nodeId, rc);
}
return (NodeState)rc;
}
/**
* This is used by the AdminPacketHandler. Called when
* a packet cannot be retransmited by the host (It was droppable or
* packet was not in the packet cache).
*
* We mark the packet as droppable so it gets removed from the stream
* the next time the messages are dispatch on the stream.
*/
synchronized void removeFromDatagramStream(UDPNodeId nodeId, int datagramId) {
NodeState nodeState = getNodeState(nodeId);
Iterator i = nodeState.datagramStream.iterator();
while (i.hasNext()) {
MessageState ds = (MessageState) i.next();
if (ds.id == datagramId)
ds.isDropable = true;
}
}
// Sends a datagram fragment chain over the network
synchronized void send(Datagram dg) throws InterruptedException,
java.io.IOException {
// Send the fragment chain.
transport.adminStream.addToSentCache(dg);
System.out.println("Sending datagram: "+dg.getId());
while (dg != null) {
DatagramPacket packet =
new java.net.DatagramPacket(dg.data, dg.data.length,
transport.clusterAddress, transport.clusterPort);
transport.normalSocket.send(packet);
dg = dg.nextFragment;
}
}
}
1.1 jbossmq/src/main/org/jbossmq/cluster/udp/UDPNodeId.java
Index: UDPNodeId.java
===================================================================
package org.jbossmq.cluster.udp;
import java.io.Externalizable;
import java.net.InetAddress;
import org.jbossmq.cluster.NodeId;
import org.jbossmq.cluster.SerializerUtil;
/**
*/
public class UDPNodeId implements NodeId {
InetAddress address;
int port;
/**
* UDPNodeId constructor comment.
*/
public UDPNodeId() {
super();
}
public boolean equals(Object obj) {
try {
UDPNodeId o = (UDPNodeId)obj;
return o.address.equals(address) &&
o.port == port;
} catch (Throwable e) {
return false;
}
}
public int hashCode() {
return address.hashCode();
}
/**
*/
public void readFromByteArray(byte data[], int pos) {
try {
port = SerializerUtil.readUShortFrom(data, pos);
} catch ( Exception ignore ) {
}
}
public String toString() {
return "UDPNodeId:["+address.getHostAddress()+":"+port+"]";
}
/**
*/
public void writeToByteArray(byte data[], int pos) {
SerializerUtil.writeUShortTo(port,data,pos);
}
}