User: hiram   
  Date: 01/02/27 20:35:02

  Modified:    src/main/org/jbossmq/cluster/udp Datagram.java
                        UDPNodeId.java UDPTransport.java
  Added:       src/main/org/jbossmq/cluster/udp UDPAdminStream.java
                        UDPStream.java
  Removed:     src/main/org/jbossmq/cluster/udp AdminStream.java
                        NormalStream.java
  Log:
  We can now send point 2 point or broadcast messages in the cluster.  Refactored alot 
of the udp implementation.
  
  Revision  Changes    Path
  1.3       +60 -53    jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java
  
  Index: Datagram.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/Datagram.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- Datagram.java     2001/02/22 06:12:29     1.2
  +++ Datagram.java     2001/02/28 04:35:00     1.3
  @@ -1,7 +1,7 @@
   /*
    * JBossMQ, the OpenSource JMS implementation
    *
  - * Distributable under GPL license.
  + * Distributable under LGPL license.
    * See terms of license at gnu.org.
    */
   package org.jbossmq.cluster.udp;
  @@ -17,30 +17,55 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   class Datagram {
   
  -     // MESSAGE FLAGS
  -     final static int DROPPABLE_MESSAGE_FLAG = 0;
  -     final static int KEEP_ORDER_MESSAGE_FLAG = 1;
  -
        // Field offsets in the header of the message
        final static int ID_OFFSET = 0; // int
        final static int LENGTH_OFFSET = 4; // int
        final static int FRAGMENT_ID_OFFSET = 8; // short
  -     final static int SENDER_ID_OFFSET = 10; // 2 bytes
  -     final static int TOPIC_ID_OFFSET = 12; // short
  -     final static byte MESSAGE_FLAGS_OFFSET = 14; // byte
  -     final static int HEADER_SIZE = 15;
  +     final static int SENDER_ID_OFFSET = 10; // 4 bytes
  +     final static int TOPIC_ID_OFFSET = 14; // short
  +     final static byte MESSAGE_FLAGS_OFFSET = 16; // byte
  +     final static int HEADER_SIZE = 17;
   
  +     // MESSAGE FLAGS
  +     public final static byte DROPPABLE_FLAG  = 1;
  +     public final static byte KEEP_ORDER_FLAG = 2;
  +     public final static byte ADMIN_FLAG = 4;
  +
        // the serialized message 
        byte data[];
  -     // if this is part of a fragment, next fragment should
  -     // point to the next fragment.
  +     // Who sent this message
  +     UDPNodeId senderId;
  +     // nextFragment can be used to form a linked list of Datagrams
        Datagram nextFragment;
   
        /**
  +      *  This is used to build a new datagram
  +      */
  +     Datagram( int length,
  +             short fragmentId,
  +             UDPNodeId senderId,
  +             short topic,
  +             byte messageFlags,
  +             byte payload[],
  +             int offset,
  +             int len) {
  +
  +             data = new byte[len + HEADER_SIZE];
  +             SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
  +             SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
  +             senderId.writeToByteArray(data, SENDER_ID_OFFSET);
  +             SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
  +             data[MESSAGE_FLAGS_OFFSET] = messageFlags;
  +
  +             for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
  +                     data[i] = payload[j];
  +     }
  +
  +     /**
         * This is used to reconstruct a datagram that was serialized
         */
        Datagram(byte datagram[], int length) {
  @@ -50,8 +75,6 @@
                }
        }
   
  -
  -
        /**
         * Gets the original payload of the datagram.
         * @return byte[]
  @@ -95,8 +118,6 @@
                return SerializerUtil.readIntFrom(data, LENGTH_OFFSET);
        }
   
  -
  -
        /**
         * The topic under which this message was sent.
         * @return short
  @@ -105,44 +126,6 @@
                return SerializerUtil.readShortFrom(data, TOPIC_ID_OFFSET);
        }
   
  -     boolean isDroppable() {
  -             return ((data[MESSAGE_FLAGS_OFFSET] >> DROPPABLE_MESSAGE_FLAG) & 0x01) 
!= 0;
  -     }
  -
  -     boolean isKeepOrder() {
  -             return ((data[MESSAGE_FLAGS_OFFSET] >> KEEP_ORDER_MESSAGE_FLAG) & 
0x01) != 0;
  -     }
  -
  -     UDPNodeId senderId;
  -
  -     /**
  -      *  This is used to build a new datagram
  -      */
  -     Datagram(
  -             int id,
  -             int length,
  -             short fragmentId,
  -             UDPNodeId senderId,
  -             short topic,
  -             boolean droppable,
  -             boolean keepOrder,
  -             byte payload[],
  -             int offset,
  -             int len) {
  -
  -             data = new byte[len + HEADER_SIZE];
  -             SerializerUtil.writeIntTo(id, data, ID_OFFSET);
  -             SerializerUtil.writeIntTo(length, data, LENGTH_OFFSET);
  -             SerializerUtil.writeShortTo(fragmentId, data, FRAGMENT_ID_OFFSET);
  -             senderId.writeToByteArray(data, SENDER_ID_OFFSET);
  -             SerializerUtil.writeShortTo(topic, data, TOPIC_ID_OFFSET);
  -             data[MESSAGE_FLAGS_OFFSET] =
  -                     (byte) (((droppable ? 1 : 0) << DROPPABLE_MESSAGE_FLAG) | 
((keepOrder ? 1 : 0) << KEEP_ORDER_MESSAGE_FLAG));
  -
  -             for (int i = HEADER_SIZE, j = offset; i < data.length; i++, j++)
  -                     data[i] = payload[j];
  -     }
  -
        /**
         * The id of the sender of this message.
         * @return int
  @@ -155,5 +138,29 @@
                t.readFromByteArray(data, SENDER_ID_OFFSET);
                senderId = t;
                return t;
  +     }
  +
  +     /**
  +      * returns true if all the given flags are set in the message.
  +      */
  +     boolean isMessageFlagsSet(int flags) {
  +             return (data[MESSAGE_FLAGS_OFFSET]&flags)==flags;
  +     }
  +     
  +     /**
  +      * Gets all the message flags.
  +      */
  +     byte getMessageFlags() {
  +             return data[MESSAGE_FLAGS_OFFSET];
  +     }
  +
  +     /**
  +      * The id of the message.  All the fragments message will have the
  +      * same Id, just different fragment ids.
  +      *
  +      * @return int
  +      */
  +     void setId(int id) {
  +             SerializerUtil.writeIntTo(id, data, ID_OFFSET);
        }
   }
  
  
  
  1.2       +49 -15    jbossmq/src/main/org/jbossmq/cluster/udp/UDPNodeId.java
  
  Index: UDPNodeId.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/UDPNodeId.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- UDPNodeId.java    2001/02/22 06:12:29     1.1
  +++ UDPNodeId.java    2001/02/28 04:35:01     1.2
  @@ -1,56 +1,90 @@
  +/*
  + * JBossMQ, the OpenSource JMS implementation
  + *
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
  + */
   package org.jbossmq.cluster.udp;
   
  -
  -import java.io.Externalizable;
   import java.net.InetAddress;
   import org.jbossmq.cluster.NodeId;
   import org.jbossmq.cluster.SerializerUtil;
   
   
   /**
  + * This class contains all the information needed to
  + * identify a node in the cluster at the transport level.
  + * Every node in the cluster open two new UDP sockets to
  + * receive point to point and admin messages.  That port numbers
  + * combined with the IP address of the machine provides
  + * us a the identity of the node.
    */
   public class UDPNodeId implements NodeId {
   
        InetAddress address;
        int port;
        
  -/**
  - * UDPNodeId constructor comment.
  - */
  -public UDPNodeId() {
  -     super();
  -}
  +     /**
  +      * UDPNodeId constructor comment.
  +      */
  +     public UDPNodeId() {
  +             super();
  +     }
  +
  +     /**
  +      * You can compare two UDPNodeIds
  +      */
        public boolean equals(Object obj) {
                try {
   
                        UDPNodeId o = (UDPNodeId)obj;
                        return o.address.equals(address) &&
  -                             o.port == port;                         
  +                             o.port == port &&
  +                             o.adminPort == adminPort;                              
 
                                
                } catch (Throwable e) {
                        return false;
                }
        }
  +
  +     /**
  +      * The Ip address hash should get good enough in most cases.
  +      * (Bad when you are running multiple nodes on a single machine)
  +      */
        public int hashCode() {
                return address.hashCode();
        }
  +     
        /**
  +      * This is used by the Datagram class.
  +      * The address of the UDPNodeId will be set from
  +      * the sender field of the IP packet the datagram
  +      * was received from.
         */
        public void readFromByteArray(byte data[], int pos) {
            try {
                    
                    port = SerializerUtil.readUShortFrom(data, pos);
  +                 adminPort = SerializerUtil.readUShortFrom(data, pos+2);
                    
            } catch ( Exception ignore ) {
  -         }
  -         
  -     }
  -     public String toString() {
  -             return "UDPNodeId:["+address.getHostAddress()+":"+port+"]";
  +         }   
        }
  +             
        /**
  +      * This is used by the Datagram class.
  +      * The address of the UDPNodeId does not need to be
  +      * serialized as the IP packet allready has a source
  +      * IP packet field that we will take advantage of.
         */
        public void writeToByteArray(byte data[], int pos) {
                SerializerUtil.writeUShortTo(port,data,pos);
  +             SerializerUtil.writeUShortTo(adminPort,data,pos+2);
        }
  -}
  +     
  +     public String toString() {
  +             return 
"UDPNodeId:["+address.getHostAddress()+":"+port+":"+adminPort+"]";
  +     }
  +
  +     int adminPort;
  +}
  \ No newline at end of file
  
  
  
  1.3       +310 -92   jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java
  
  Index: UDPTransport.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/jbossmq/src/main/org/jbossmq/cluster/udp/UDPTransport.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- UDPTransport.java 2001/02/22 06:12:29     1.2
  +++ UDPTransport.java 2001/02/28 04:35:01     1.3
  @@ -1,7 +1,7 @@
   /*
    * JBossMQ, the OpenSource JMS implementation
    *
  - * Distributable under GPL license.
  + * Distributable under LGPL license.
    * See terms of license at gnu.org.
    */
   package org.jbossmq.cluster.udp;
  @@ -13,9 +13,18 @@
   import java.util.Hashtable;
   import java.util.LinkedList;
   import java.io.IOException;
  +import java.io.ObjectOutputStream;
  +import java.io.ByteArrayOutputStream;
  +import java.io.ByteArrayInputStream;
  +import java.io.Externalizable;
  +import java.io.ObjectInputStream;
   
   import org.jbossmq.cluster.TransportListener;
   
  +import org.jbossmq.Log;
  +
  +import java.util.Vector;
  +
   /**
    * This Transport implements a Datagram based cluster transport.
    * It Can use UDP or Multicast.
  @@ -26,69 +35,41 @@
    * Since Datagrams have a chance of not making it, this class
    * also provides retransmision facilities.
    *
  + * It uses two UDPStreams.  The first stream is the broadcast stream
  + * and it receives multicast packets sent to the cluster.
  + * The second is a point stream and is used to directly send a message
  + * to this node.
  + *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    */
   public class UDPTransport implements org.jbossmq.cluster.Transport {
   
  -     // The class responsible for pulling messages from the network
  -     NormalStream normalStream = new NormalStream();
  -     // The interface into the cluster we report message arivals to
  -     TransportListener transportListener;
  -     // We incrementaly number the datagrams we send
  -     int nextDatagramId = 0;
  -
  -     AdminStream adminStream = new AdminStream();
  +     // A reference to the cluster the transport is working with     
        org.jbossmq.cluster.Cluster cluster;
  -     // Adderes datagrams will be sent to
  -
  -     InetAddress clusterAddress;
  -     // Port datagrams will be sent to
  -     int clusterPort;
  -     // A message can be composed of multiple datagram fragments.
  +     // The interface into the cluster we report message arivals to
  +     private TransportListener transportListener;
  +     // Receives the multicast messages
  +     UDPStream broadcastStream = new UDPStream();
  +     // Receives the messages addressed to this node
  +     UDPStream pointStream = new UDPStream();
  +     // The maximum size a datagram fragment can be.
        int maxFragmentSize = 1024;
  -     // Socket that we will be sending data over.
  -     DatagramSocket normalSocket;
  +     // Is the transport started?
        boolean started = false;
   
  -     // I like simple constructors.
  +     /**
  +      * I like simple constructors.
  +      */
        public UDPTransport() {
  -             normalStream.transport = this;
  +             broadcastStream.transport = this;
  +             broadcastStream.setName("Broadcast");
  +             pointStream.transport = this;
  +             pointStream.setName("Point");
                adminStream.transport = this;
        }
   
  -     
  -     // I am assuming that the post increment is atomic, 
  -     // otherwise this needs to get synchronized
  -     int getNextDatagramId() {
  -             return nextDatagramId++;
  -     }
  -
  -     // A complete datagram has arrived to us from the InputStream
  -     // We build a object from it and send it to the TransportListener
  -     void messageArrivedEvent(Datagram dg) throws InterruptedException {
  -
  -             if (!cluster.isListeningOn(dg.getTopicId()))
  -                     return;
  -
  -             byte data[];
  -             if (dg.getFragmentCount(maxFragmentSize) == 1) {
  -                     data = dg.getData();
  -             } else {
  -                     data = new byte[dg.getLength()];
  -                     int dataPos = 0;
  -                     while (dg != null) {
  -                             for (int i = dataPos, j = Datagram.HEADER_SIZE; j < 
dg.data.length; i++, j++)
  -                                     data[i] = dg.data[j];
  -                             dataPos += dg.data.length - Datagram.HEADER_SIZE;
  -                             dg = dg.nextFragment;
  -                     }
  -             }
  -
  -             transportListener.messageArrivedEvent(dg.getTopicId(), 
dg.getSenderId(), data);
  -
  -     }
   
        // This is set when the transport is set in the cluster.
        public void setCluster(org.jbossmq.cluster.Cluster newCluster) {
  @@ -100,12 +81,6 @@
                transportListener = c;
        }
   
  -     // This called when the InputStream detects that another node
  -     // in the cluster is using our nodeId.
  -     void senderConflictEvent() {
  -             System.out.println("WARNNING: NodeId conflict detected.");
  -     }
  -
        // This method is used to initialize is
        // transport layer.
        synchronized public void setProperties(Hashtable t)
  @@ -134,31 +109,36 @@
                                InetAddress group = InetAddress.getByName(groupString);
                                int port = Integer.parseInt(portString);
   
  -                             DatagramSocket a;
  -                             MulticastSocket s = new MulticastSocket(port);
  -                             s.joinGroup(group);
  +                             DatagramSocket pointSocket;
  +                             DatagramSocket adminSocket;
  +                             MulticastSocket broadcastSocket = new 
MulticastSocket(port);
  +                             broadcastSocket.joinGroup(group);
   
                                if (nodeInterfaceString != null) {
                                        nodeId.address = 
InetAddress.getByName(nodeInterfaceString);
                                } else {
                                        nodeId.address = InetAddress.getLocalHost();
                                }
  -                             s.setInterface(nodeId.address);
  +                             broadcastSocket.setInterface(nodeId.address);
   
                                if (nodePortString != null) {
                                        nodeId.port = Integer.parseInt(nodePortString);
  -                                     a = new DatagramSocket(nodeId.port);
  +                                     pointSocket = new DatagramSocket(nodeId.port);
                                } else {
  -                                     a = new DatagramSocket();
  -                                     nodeId.port = a.getLocalPort();
  +                                     pointSocket = new DatagramSocket();
  +                                     nodeId.port = pointSocket.getLocalPort();
                                }
  -
  -                             System.out.println("Local NodeId: "+nodeId);
                                
  -                             clusterPort = port;
  -                             clusterAddress = group;
  -                             normalSocket = s;
  -                             adminSocket = a;
  +                             adminSocket = new DatagramSocket();
  +                             nodeId.adminPort = adminSocket.getLocalPort();
  +                             
  +                             Log.log("["+this+"] Local NodeId: "+nodeId);
  +
  +                             clusterId.address = group;
  +                             clusterId.port = port;
  +                             broadcastStream.socket = broadcastSocket;
  +                             pointStream.socket = pointSocket;
  +                             adminStream.socket = adminSocket;
   
                        } catch (IOException e) {
                                throw new 
org.jbossmq.cluster.InvalidConfigurationException(
  @@ -171,17 +151,19 @@
   
        // Starts the transport 
        synchronized public void start() throws 
org.jbossmq.cluster.InvalidStateException {
  -             if (normalSocket == null)
  +             if (broadcastStream.socket == null)
                        throw new org.jbossmq.cluster.InvalidStateException("The 
transport properties have not been set yet.");
   
                started = true;
  -             normalStream.start();
  +             broadcastStream.start();
  +             pointStream.start();
                adminStream.start();
        }
   
        // Stops the transport 
        synchronized public void stop() throws InterruptedException {
  -             normalStream.stop();
  +             broadcastStream.stop();
  +             pointStream.stop();
                adminStream.stop();
                started = false;
        }
  @@ -190,12 +172,260 @@
   
        // Builds a datagram fragment chain with the given data
        // and then sends it over the network
  -     public void send(short channelId, byte data[], boolean droppable, boolean 
keepOrder)
  +     public void send(short channelId, byte data[], boolean droppable, boolean 
keepOrder) throws IOException, InterruptedException {
  +             send( null, channelId, data, droppable, keepOrder);
  +     }
  +
  +
  +     // Our unique address in the cluster
  +     UDPNodeId nodeId = new UDPNodeId();
  +     // timmer Used to slow down the transmision of messages.
  +     volatile long stopSendsTill=0;
  +
  +     //
  +     // This is the message sent to request a packet be resent
  +     //
  +     static class ResendAdminDatagram implements java.io.Serializable {
  +             ResendAdminDatagram(int j[], boolean b) {
  +                     datagramIds = j;
  +                     broadcast=b;
  +             }
  +             int datagramIds[];
  +             boolean broadcast;
  +     }
  +
  +     //
  +     // This is the message sent when the requested packet is not in the cache.
  +     //
  +     static class ResendErrorAdminDatagram implements java.io.Serializable {
  +             ResendErrorAdminDatagram(int j, boolean b) {
  +                     datagramId = j;
  +                     broadcast=b;
  +             }
  +             int datagramId;
  +             boolean broadcast;
  +     }
  +
  +     // This send is used to send the administrative messages.
  +     public void adminSend(UDPNodeId dest, Object message) throws IOException, 
InterruptedException {
  +
  +             ByteArrayOutputStream bos = new ByteArrayOutputStream();
  +             ObjectOutputStream os = new ObjectOutputStream(bos);
  +             os.writeObject(message);
  +             os.close();
  +
  +             byte data[] = bos.toByteArray();
  +             Datagram dg = new Datagram(
  +                     data.length, 
  +                     (short) 0, 
  +                     nodeId,
  +                     (short) 0, 
  +                     (byte)(Datagram.ADMIN_FLAG|Datagram.DROPPABLE_FLAG), 
  +                     data, 
  +                     0, 
  +                     data.length);
  +
  +             dg.setId(adminStream.getNextDatagramId());      
  +             Log.log("["+this+"] Sending admin datagram: "+dg.getId());
  +             DatagramPacket packet =
  +             new java.net.DatagramPacket(dg.data, dg.data.length, dest.address, 
dest.adminPort);
  +             adminStream.socket.send(packet);
  +     }
  +
  +     void datagramArrived(Datagram dg) throws InterruptedException {
  +
  +             Log.log("["+this+"] A datagram arrived");
  +
  +             if( dg.isMessageFlagsSet(Datagram.ADMIN_FLAG) ) {
  +                 try {
  +
  +                     Object o;
  +                     ByteArrayInputStream bais = new 
ByteArrayInputStream(dg.getData());
  +                     ObjectInputStream is = new ObjectInputStream(bais);
  +                     o = is.readObject();
  +                     is.close();
  +
  +                     if (o instanceof ResendAdminDatagram) {
  +
  +                         ResendAdminDatagram radg = (ResendAdminDatagram) o;
  +                         for( int i=0; i < radg.datagramIds.length; i++ ) {         
     
  +                                 Log.log("["+this+"] RESEND REQUEST ARRIVED: " + 
radg.datagramIds[i]);
  +                                 resendDatagram(dg.getSenderId(), 
radg.datagramIds[i], radg.broadcast);
  +                         }
  +                     } else if (o instanceof ResendErrorAdminDatagram) {
  +
  +                         ResendErrorAdminDatagram readg = 
(ResendErrorAdminDatagram) o;
  +                         Log.log("["+this+"] RESEND REQUEST FAILED: " + 
readg.datagramId);
  +                         if (readg.broadcast) {
  +                             
broadcastStream.removeFromDatagramStream(dg.getSenderId(), readg.datagramId);
  +                         } else {
  +                             pointStream.removeFromDatagramStream(dg.getSenderId(), 
readg.datagramId);
  +                         }
  +
  +                     }
  +
  +                 } catch (ClassNotFoundException e) {
  +                     e.printStackTrace();
  +                 } catch (IOException e) {
  +                     e.printStackTrace();
  +                 }
  +             } else {
  +
  +                     // A complete datagram has arrived to us from the InputStream
  +                     // We build a object from it and send it to the 
TransportListener
  +                     if (!cluster.isListeningOn(dg.getTopicId()))
  +                             return;
  +
  +                     byte data[];
  +                     if (dg.getFragmentCount(maxFragmentSize) == 1) {
  +                             data = dg.getData();
  +                     } else {
  +                             data = new byte[dg.getLength()];
  +                             int dataPos = 0;
  +                             while (dg != null) {
  +                                     for (int i = dataPos, j = 
Datagram.HEADER_SIZE; j < dg.data.length; i++, j++)
  +                                             data[i] = dg.data[j];
  +                                     dataPos += dg.data.length - 
Datagram.HEADER_SIZE;
  +                                     dg = dg.nextFragment;
  +                             }
  +                     }
  +
  +                     transportListener.messageArrivedEvent(dg.getTopicId(), 
dg.getSenderId(), data);
  +             }
  +     }
  +
  +
  +
  +     // Resends the message if it was found in the cache
  +     void resendDatagram(UDPNodeId requestor, int messageId, boolean broadcast) 
throws InterruptedException, java.io.IOException {
  +
  +             slowDownSendRate();
  +             if( broadcast ) {
  +                     Datagram dg = broadcastStream.getFromSentCache(messageId);
  +                     if(dg!=null) {
  +                             Log.log("["+this+"] BROADCAST RESEND REQUEST SERVICED: 
" + messageId);
  +                             send(dg, false);
  +                             return;
  +                     } else {
  +                             Log.log("["+this+"] BROADCAST RESEND REQUEST NOT 
SERVICED (not in cache): " + messageId);
  +                     }
  +             } else {
  +                     Datagram dg = pointStream.getFromSentCache(messageId);
  +                     if(dg!=null) {
  +                             Log.log("["+this+"] POINT RESEND REQUEST SERVICED: " + 
messageId);
  +                             send(requestor, dg, false);
  +                             return;
  +                     } else {
  +                             Log.log("["+this+"] POINT RESEND REQUEST NOT SERVICED 
(not in cache): " + messageId);
  +                     }
  +             }
  +             
  +             ResendErrorAdminDatagram error = new 
ResendErrorAdminDatagram(messageId,broadcast);
  +             adminSend(requestor, error);
  +     }
  +
  +     // This is set when the transport is set in the cluster.
  +     public void slowDownSendRate() {
  +             stopSendsTill = System.currentTimeMillis()+1000;
  +     }
  +
  +
  +     // Sends a datagram fragment chain over the network
  +     synchronized void send(Datagram dg) throws InterruptedException, 
java.io.IOException {
  +             send( dg, true );
  +     }
  +
  +     // Sends a datagram fragment chain over the network
  +     synchronized void send(UDPNodeId dest, Datagram dg) throws 
InterruptedException, java.io.IOException {
  +             send( dest, dg, true );
  +     }
  +
  +     // The broadcast address of the cluster 
  +     UDPNodeId clusterId = new UDPNodeId();
  +
  +     // Sends a datagram fragment chain over the network
  +     synchronized void send(Datagram dg, boolean setDatagramId) throws 
InterruptedException, java.io.IOException {
  +             
  +             int id=0;
  +             if( setDatagramId ) {
  +                     id = broadcastStream.getNextDatagramId();
  +                     dg.setId(id);
  +             }
  +
  +             broadcastStream.addToSentCache(dg);
  +
  +             // Send the fragment chain.
  +             Log.log("["+this+"] Sending Broadcast datagram: "+dg.getId());
  +             while (dg != null) {
  +                     
  +                     if( setDatagramId )     
  +                             dg.setId(id);
  +                             
  +                     DatagramPacket packet =
  +                             new java.net.DatagramPacket(dg.data, dg.data.length, 
clusterId.address, clusterId.port);
  +                     broadcastStream.socket.send(packet);
  +                     dg = dg.nextFragment;
  +             }
  +     }
  +
  +     // Sends a datagram fragment chain over the network
  +     synchronized void send(UDPNodeId dest, Datagram dg, boolean setDatagramId) 
throws InterruptedException, java.io.IOException {
  +             int id=0;
  +             if( setDatagramId ) {
  +                     id = broadcastStream.getNextDatagramId();
  +                     dg.setId(id);
  +             }
  +             
  +             pointStream.addToSentCache(dg);
  +             
  +             Log.log("["+this+"] Sending Point datagram: "+dg.getId());
  +             // Send the fragment chain.
  +             while (dg != null) {
  +                     if( setDatagramId )     
  +                             dg.setId(id);
  +                             
  +                     DatagramPacket packet =
  +                             new java.net.DatagramPacket(dg.data, dg.data.length, 
dest.address, dest.port);
  +                     pointStream.socket.send(packet);
  +                     dg = dg.nextFragment;
  +             }
  +     }
  +
  +     // Receives admin messages addressed to this node
  +     UDPAdminStream adminStream = new UDPAdminStream();
  +
  +     // Sends a resend request
  +     void requestResend(UDPNodeId nodeId, Vector messageIds, boolean broadcast) {
  +
  +             int MAX_RESEND_ARRAY_SIZE = 100;
  +             int msgs[] = new int[ messageIds.size()<MAX_RESEND_ARRAY_SIZE ? 
messageIds.size() : MAX_RESEND_ARRAY_SIZE ];
  +             for( int i=0; i<msgs.length; i++ ) {
  +                     msgs[i] = ((Integer)messageIds.elementAt(i)).intValue();
  +             }
  +                     
  +             Log.log("["+this+"] SENDING RESEND REQUEST: " + messageIds);
  +             ResendAdminDatagram m = new ResendAdminDatagram(msgs,broadcast);
  +             try {
  +                     adminSend(nodeId, m);
  +             } catch (Exception e) {
  +                     e.printStackTrace();
  +             }
  +
  +     }
  +
  +     // Builds a datagram fragment chain with the given data
  +     // and then sends it over the network
  +     public void send(org.jbossmq.cluster.NodeId dest, short channelId, byte 
data[], boolean droppable, boolean keepOrder)
                throws IOException, InterruptedException {
  -             int id = getNextDatagramId();
                int length = data.length;
                short fragmentId = 0;
  +             byte msgFlags=0;
   
  +             if( droppable )
  +                     msgFlags = (byte)(msgFlags | Datagram.DROPPABLE_FLAG);
  +             if( keepOrder )
  +                     msgFlags = (byte)(msgFlags | Datagram.KEEP_ORDER_FLAG);
  +
                Datagram firstFragment = null;
                Datagram lastFragment = null;
   
  @@ -215,13 +445,11 @@
                                // This a MAX size fragment
                                dg =
                                        new Datagram(
  -                                             id,
                                                length,
                                                fragmentId,
                                                nodeId,
                                                channelId,
  -                                             droppable,
  -                                             keepOrder,
  +                                             msgFlags,
                                                data,
                                                fragmentId * maxFragmentSize,
                                                maxFragmentSize);
  @@ -229,13 +457,11 @@
                                // This a not a MAX size fragment
                                dg =
                                        new Datagram(
  -                                             id,
                                                length,
                                                fragmentId,
                                                nodeId,
                                                channelId,
  -                                             droppable,
  -                                             keepOrder,
  +                                             msgFlags,
                                                data,
                                                fragmentId * maxFragmentSize,
                                                length - (fragmentId * 
maxFragmentSize));
  @@ -252,17 +478,9 @@
                }
   
                // Send the fragment chain.
  -             normalStream.send(firstFragment);
  -     }
  -
  -     // Socket that we will be sending admin data over.
  -     DatagramSocket adminSocket;
  -     UDPNodeId nodeId = new UDPNodeId();
  -     // timmer Used to slow down the transmision of messages.
  -     volatile long stopSendsTill=0;
  -
  -     // This is set when the transport is set in the cluster.
  -     public void slowDownSendRate() {
  -             stopSendsTill = System.currentTimeMillis()+500;
  +             if( dest == null ) 
  +                     send(firstFragment);
  +             else
  +                     send( (UDPNodeId)dest, firstFragment);
        }
   }
  
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/udp/UDPAdminStream.java
  
  Index: UDPAdminStream.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.udp;
  
  import java.net.DatagramSocket;
  import java.net.DatagramPacket;
  import java.util.LinkedList;
  import java.util.Iterator;
  import java.util.HashMap;
  
  import org.jbossmq.Log;
  
  import java.util.Vector;
  
  /**
   * The UDPAdminStream class blocks on a UDP socket
   * waiting for admin packets.
   *
   * Admin packet must fit into a single fragment. No
   * packet recovery is needed.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  class UDPAdminStream implements Runnable {
  
        // The channel manager owning this Inbound channel
        UDPTransport transport;
        // The thread that is reciving messages.
        private Thread runningThread;
        // Used to stop the thread
        private boolean done = false;
        // We incrementaly number the datagrams we send
        private int nextDatagramId = 0;
        // The socket we will be listening on
        DatagramSocket socket;
  
  
        /**
         * Starts the input thread
         */
        synchronized void start() {
                if (runningThread != null)
                        return;
                done = false;
                runningThread = new Thread(this, "UDPAdminStream");
                runningThread.start();
        }
  
        /**
         * Stops the input thread
         */
        synchronized void stop() throws InterruptedException {
                if (runningThread == null)
                        return;
                done = true;
                runningThread.interrupt();
                runningThread.join();
                runningThread = null;
        }
  
  
        /**
         * The thread of the InboundStream
         */
        public void run() {
  
                try {
  
                        byte buffer[] = new byte[transport.maxFragmentSize + 
Datagram.HEADER_SIZE];
                        DatagramPacket packet = new java.net.DatagramPacket(buffer, 
buffer.length);
  
                        // We iterate at least every 2 seconds so that we can
                        // stop the thread
                        socket.setSoTimeout(2000);
  
                        while (!done) {
  
                                try {
  
                                        // Read in a message from the network
                                        packet.setData(buffer);
                                        packet.setLength(buffer.length);
                                        socket.receive(packet);
  
                                        // Is the packet ok?
                                        if (packet.getLength() < Datagram.HEADER_SIZE) 
{
                                                Log.notice("["+this+"] Packet was too 
small, dropping.");
                                                continue;
                                        }
  
                                        // Build the Datagram object form the network 
data
                                        Datagram dg = new Datagram(packet.getData(), 
packet.getLength());
                                        dg.getSenderId().address = packet.getAddress();
  
                                        // Drop the packet if this node sent this 
packet
                                        if (transport.nodeId.equals(dg.getSenderId())) 
{
                                                continue;
                                        }
  
                                        // Used to test the packet retransmision 
faclities
                                        // of the UDPTransport class.  We drop 1 in 10 
packets.
                                        java.util.Random r = new java.util.Random();
                                        if (r.nextInt(100) < 10) {
                                                Log.error("["+this+"] TEST DROP: " + 
dg.getId());
                                                continue;
                                        }
  
                                        // Continue processing the datagram
                                        fragmentArrived(dg);
  
                                } catch (java.io.InterruptedIOException e) {
                                } catch (InterruptedException e) {
                                }
                        }
  
                } catch (java.io.IOException e) {
                        e.printStackTrace();
                }
        }
  
  
        /**
         * This places the datagram in the proper position in the
         * message "Stream"
         */
        synchronized private void fragmentArrived(Datagram dg) throws 
InterruptedException {
                transport.datagramArrived(dg);
        }
  
        /** 
         * Returns the next message id in the output stream.
         */
        protected int getNextDatagramId() {
                // I think ++ is atomic operation in Java.. (No synch needed)
                return nextDatagramId++;
        }
  
        public String toString() {
                return "org.jbossmq.cluster.udp.UDPAdminStream";
        }
  }
  
  
  1.1                  jbossmq/src/main/org/jbossmq/cluster/udp/UDPStream.java
  
  Index: UDPStream.java
  ===================================================================
  /*
   * JBossMQ, the OpenSource JMS implementation
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  package org.jbossmq.cluster.udp;
  
  import java.net.DatagramSocket;
  import java.net.DatagramPacket;
  import java.util.LinkedList;
  import java.util.Iterator;
  import java.util.HashMap;
  
  import org.jbossmq.Log;
  
  import java.util.Vector;
  
  /**
   * The UDPStream class blocks on a UDP socket
   * waiting for input.  Since a message is made up 
   * of multiple fragments and UDP does not garantee message
   * delivery, we keep track of the "Stream" of message
   * being sent by all the nodes in the cluster.
   *
   * The "Stream" is a linked list of MessageState objects.
   * The head of the list is the oldest message that has not been completed
   * yet.  The tail of the list is the most recently received message from
   * the node.  Messages between the tail and head end could be completely
   * received or in progess.  Message marked as "not required to be in order"
   * are sent to the cluster to be processed by the listners as soon as they are 
   * completed, and removed form the "Stream".  When the head message is completed
   * it is sent up to the cluster to be processed and it is removed from the "stream"
   * When the head message is != to the tail message, this means that a datagram
   * was dropped and the head message needs to be resent, so a resend request is 
issued.
   * When a new message arrives it is placed at the end of the message stream but
   * filler message will be added to the stream to fill the space left by any dropped
   * messages.
   *
   * @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   * @version $Revision: 1.1 $
   */
  class UDPStream implements Runnable {
  
        // The maximum size the datagram input stream can grow to.
        private static final int MAX_DATAGRAM_STREAM_SIZE = 100;
  
        // The channel manager owning this Inbound channel
        UDPTransport transport;
        // The thread that is reciving messages.
        private Thread runningThread;
        // Used to stop the thread
        private boolean done = false;
        // Up to how many sent packets do we cache?
        private int maxSentCacheSize = 200;
        // The name this stream.  Used to ease debugging.
        public java.lang.String name;
        // We incrementaly number the datagrams we send
        private int nextDatagramId = 0;
        // Used to for error recovery.
        private LinkedList sentCache = new LinkedList();
        // The socket we will be listening on
        DatagramSocket socket;
        
        // Maps UDPNodeIds to NodeStates
        private HashMap nodes = new HashMap(10);
        
        // Keeps track of the progress of message reception
        private static class MessageState {
                MessageState(int id) {
                        this.id = id;
                }
                int id;
                boolean arrived = false;
                boolean isDropable = false;
                long lastResendRequest = 0;
                Datagram[] fragments;
        }
        // Keeps track of the state of a remote nodes in the cluster
        private static class NodeState {
                UDPNodeId nodeId;
                LinkedList datagramStream = new LinkedList();
                int lastDatagramId = 0;
                boolean pastFirstPacket = false;
        }
  
        /**
         * Gets the MessageState object for the given Node/Message Id
         * This will create new MessageState objects as needed.
         */
        private MessageState getMessageState(NodeState nodeState, int id) {
                boolean rollOver = id < -5000 && nodeState.lastDatagramId > 5000;
  
                if (!nodeState.pastFirstPacket) {
  
                        MessageState t = new MessageState(id);
                        nodeState.datagramStream.addLast(t);
                        nodeState.lastDatagramId = id;
                        nodeState.pastFirstPacket = true;
                        return t;
  
                } else if (nodeState.lastDatagramId == id) {
                        
                        // This is weird.  A node might have retransmitted a packet 
for a
                        // message that was succesfully recived the first time 
(therefore
                        // we are not pending messages for it in the datagramStream)
                        if( nodeState.datagramStream.size()==0 )
                                return null;
  
                        // The packet id is the same as the last packet.
                        return (MessageState) nodeState.datagramStream.getLast();
  
                } else if (nodeState.lastDatagramId < id || rollOver) {
  
                        // The packet id is newer than the last packet
                        int i = nodeState.lastDatagramId;
                        while (i != id) {
                                i++;
                                // Packets got dropped but we record them in the 
stream anyways.
                                MessageState t = new MessageState(i);
                                nodeState.datagramStream.addLast(t);
  
                                if (nodeState.datagramStream.size() > 
MAX_DATAGRAM_STREAM_SIZE) {
                                        handleNodeUnresponsive(nodeState);
                                        nodeState.datagramStream.removeFirst();
                                }
                        }
                        nodeState.lastDatagramId = id;
                        return (MessageState) nodeState.datagramStream.getLast();
  
                } else {
  
                        // The packet id older than the last packet.  We have to 
search for it
                        // int the datagram stream.
                        Iterator i = nodeState.datagramStream.iterator();
                        while (i.hasNext()) {
                                MessageState ds = (MessageState) i.next();
                                if (ds.id == id)
                                        return ds;
                        }
  
                        // This is weird.  A node might have retransmitted a packet 
for a
                        // message that was succesfully recived the first time 
(therefore
                        // we are not pending messages for it in the datagramStream)
                        return null;
  
                }
        }
  
        // We should do something else to inform the cluster that
        // a node is being unresponsive.  
        // This happens when the message stream gets too long.
        // TODO: make it also happen when a message sits too long on
        // the stream.
        private void handleNodeUnresponsive(NodeState nodeState) {
                Log.error("NODE IS UNRESPONSIVE!!!");
        }
  
        /**
         * Starts the input thread
         */
        synchronized void start() {
                if (runningThread != null)
                        return;
                done = false;
                runningThread = new Thread(this, "UDPStream-"+name);
                runningThread.start();
        }
  
        /**
         * Stops the input thread
         */
        synchronized void stop() throws InterruptedException {
                if (runningThread == null)
                        return;
                done = true;
                runningThread.interrupt();
                runningThread.join();
                runningThread = null;
        }
  
  
        /**
         * The thread of the InboundStream
         */
        public void run() {
  
                try {
  
                        byte buffer[] = new byte[transport.maxFragmentSize + 
Datagram.HEADER_SIZE];
                        DatagramPacket packet = new java.net.DatagramPacket(buffer, 
buffer.length);
  
                        // We iterate at least every 500 miliseconds so that we can
                        // dispatch resend request messages frequently
                        socket.setSoTimeout(500);
  
                        while (!done) {
  
                                try {
  
                                        // We might be doing this too often..
                                        synchronized( this ) {
                                                Iterator i = nodes.values().iterator();
                                                while( i.hasNext() ) {
                                                        NodeState nodeState = 
(NodeState)i.next();
                                                        dispatchDatagrams(nodeState);
                                                }
                                        }
  
                                        // Read in a message from the network
                                        packet.setData(buffer);
                                        packet.setLength(buffer.length);
                                        socket.receive(packet);
  
                                        // Is the packet ok?
                                        if (packet.getLength() < Datagram.HEADER_SIZE) 
{
                                                Log.notice("["+this+"] Packet was too 
small, dropping.");
                                                continue;
                                        }
  
                                        // Build the Datagram object form the network 
data
                                        Datagram dg = new Datagram(packet.getData(), 
packet.getLength());
                                        dg.getSenderId().address = packet.getAddress();
  
                                        // Drop the packet if this node sent this 
packet
                                        if (transport.nodeId.equals(dg.getSenderId())) 
{
                                                continue;
                                        }
  
                                        // Used to test the packet retransmision 
faclities
                                        // of the UDPTransport class.  We drop 1 in 10 
packets.
                                        java.util.Random r = new java.util.Random();
                                        if (r.nextInt(100) < 10) {
                                                Log.error("["+this+"] TEST DROP: " + 
dg.getId());
                                                continue;
                                        }
  
                                        // Continue processing the datagram
                                        fragmentArrived(dg);
  
                                } catch (java.io.InterruptedIOException e) {
                                } catch (InterruptedException e) {
                                }
                        }
  
                } catch (java.io.IOException e) {
                        e.printStackTrace();
                }
        }
  
  
        /**
         * Adds a packet to the sent packet cache
         * Removes the oldest packet if the cache is full.
         */
        void addToSentCache(Datagram dg) {
                
                if( dg.isMessageFlagsSet(Datagram.DROPPABLE_FLAG) )
                        return;
                synchronized (sentCache) {
                        sentCache.addFirst(dg);
                        if (sentCache.size() > maxSentCacheSize)
                                sentCache.removeLast();
                }
        }
  
        /**
         * This method does the bulk of the "Stream" managment.
         * - it delivers messages that are complete.
         * - drops messages that are incomplete and droppable.
         * - Requests retransmision of the incomplete packet at the
         *   head of the stream.
         */
        private void dispatchDatagrams(NodeState nodeState) throws 
InterruptedException {
  
                if (nodeState.datagramStream.size() == 0)
                        return;
  
                boolean atFront = true;
                Object lastDS = nodeState.datagramStream.getLast();
                Iterator i = nodeState.datagramStream.iterator();
                MessageState firstMissing = null;
                Vector missingList = new Vector();
                while (i.hasNext()) {
  
                        MessageState ds = (MessageState) i.next();
  
                        if (ds.arrived) {
                                if (atFront || 
!(ds.fragments[0].isMessageFlagsSet(Datagram.KEEP_ORDER_FLAG)) ) {
                                        transport.datagramArrived(ds.fragments[0]);
                                        i.remove();
                                }
                                continue;
                        } else {
                                atFront = false;
  
                                if (ds == lastDS) {
                                        continue;
                                } else if (ds.isDropable) {
                                        i.remove();
                                } else {
                                        if (firstMissing == null) {
                                                firstMissing = ds;
                                        }
                                        missingList.addElement(new Integer(ds.id));
                                }
                        }
                }
  
                if ( firstMissing != null && System.currentTimeMillis() - 
firstMissing.lastResendRequest > 200 ) {
                        transport.requestResend(nodeState.nodeId, 
missingList,this==transport.broadcastStream);
                        firstMissing.lastResendRequest = System.currentTimeMillis();
                }
        }
  
        /**
         * This places the datagram in the proper position in the
         * message "Stream"
         */
        synchronized private void fragmentArrived(Datagram dg) throws 
InterruptedException {
        
                UDPNodeId senderId = dg.getSenderId();
                int id = dg.getId();
                short fragmentId = dg.getFragmentId();
                Log.log("["+this+"] Processing datagram: " + id);
                
                NodeState nodeState = getNodeState(senderId);
                MessageState ds = getMessageState(nodeState, id);
  
                // We might have allready received this message.
                if (ds == null || ds.arrived)
                        return;
  
                if (dg.getFragmentCount(transport.maxFragmentSize) == 1) {
                        // The entire message was contained within this datagram
                        ds.arrived = true;
                        ds.fragments = new Datagram[1];
                        ds.fragments[0] = dg;
                                
                } else {
                        // The message is fragmented.
                        if (ds.fragments == null) {
                                ds.fragments = new 
Datagram[dg.getFragmentCount(transport.maxFragmentSize)];
                                ds.isDropable = 
dg.isMessageFlagsSet(Datagram.DROPPABLE_FLAG);
                        }
  
                        ds.fragments[fragmentId] = dg;
  
                        // Check to see if we have recived all the fragments?
                        boolean messageLoaded = true;
                        // iterating backwards will break us out of the loop 
                        // quicker in the common case
                        for (int i = ds.fragments.length - 1; i >= 0; i--) {
                                if (ds.fragments[i] == null) {
                                        messageLoaded = false;
                                        break;
                                }
                        }
  
                        if (messageLoaded) {
                                // chain the fragments together
                                for (int i = 0; i < ds.fragments.length - 1; i++)
                                        ds.fragments[i].nextFragment = ds.fragments[i 
+ 1];
  
                                ds.arrived = true;
                        }
                }
        }
  
        /**
         * Looks for a packet in the sent cache.
         * returns null if the packet could not be found.
         */
        public Datagram getFromSentCache(int messageId) {
                LinkedList t;
                synchronized (sentCache) {
                        t = (LinkedList) sentCache.clone();
                }
                java.util.Iterator i = t.iterator();
                while (i.hasNext()) {
                        Datagram dg = (Datagram) i.next();
                        if (dg.getId() == messageId) {
                                return dg;
                        }
                }
                return null;
        }
  
        /** 
         * Returns the next message id in the output stream.
         */
        protected int getNextDatagramId() {
                // I think ++ is atomic operation in Java.. (No synch needed)
                return nextDatagramId++;
        }
  
        /**
         * Gets the NodeState for the given node id.
         * if none exists, a new NodeState object is created.
         */
        private NodeState getNodeState(UDPNodeId nodeId) {
                NodeState rc;
                rc = (NodeState)nodes.get( nodeId );
                if (rc==null) {
                        rc = new NodeState();
                        rc.nodeId = nodeId;
                        nodes.put(nodeId, rc);
                }
                return (NodeState)rc;
        }
  
        /**
         * This is used by the AdminPacketHandler.  Called when
         * a packet cannot be retransmited by the host (It was droppable or
         * packet was not in the packet cache).
         *
         * We mark the packet as droppable so it gets removed from the stream
         * the next time the messages are dispatch on the stream.
         */
        synchronized void removeFromDatagramStream(UDPNodeId nodeId, int datagramId) {
                NodeState nodeState = getNodeState(nodeId);
  
                Iterator i = nodeState.datagramStream.iterator();
                while (i.hasNext()) {
                        MessageState ds = (MessageState) i.next();
                        if (ds.id == datagramId)
                                ds.isDropable = true;
                }
                
        }
  
        /**
         * This is a descriptive name for the stream.  Used of debuging messages
         */
        public void setName(java.lang.String newName) {
                name = newName;
        }
  
        public String toString() {
                return "org.jbossmq.cluster.udp.UDPStream:"+name;
        }
  }
  
  

Reply via email to