User: hiram   
  Date: 00/11/17 09:30:28

  Modified:    src/java/org/spydermq JMSServerQueue.java SpyMessage.java
  Log:
  Sometimes message would get sent out of order.  This would
  happen whenever there was an error send the message (like
  a NoReceiverException).  The errored message would get
  put at the back of the queue.  Fixed that and added better support
  for priority messages (for example: a priorty 2 message now
  jumps ahead of a priorty 1 message.
  
  Revision  Changes    Path
  1.16      +118 -118  spyderMQ/src/java/org/spydermq/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- JMSServerQueue.java       2000/11/14 05:54:08     1.15
  +++ JMSServerQueue.java       2000/11/17 17:30:27     1.16
  @@ -12,16 +12,16 @@
   import java.util.Hashtable;
   import java.util.LinkedList;
   import java.util.HashMap;
  +import java.util.TreeSet;
   
   /**
  - *   This class is a message queue which is stored (hashed by Destination) on the 
JMS provider
  + *This class is a message queue which is stored (hashed by Destination) on the JMS 
provider
    *      
  - *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *@author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.15 $
  + *@version $Revision: 1.16 $
    */
  -public class JMSServerQueue
  -{
  +public class JMSServerQueue {
        // Attributes ----------------------------------------------------
   
        //the Destination of this queue
  @@ -29,7 +29,7 @@
        //DistributedConnection objs that have "registered" to this Destination
        private HashMap subscribers;
        //List of Pending messages
  -     private LinkedList messages;
  +     private TreeSet messages;
        //Is a thread already working on this queue ? 
        //You cannot start two threads on the same destination (correct order of msgs)
        private boolean threadWorking;
  @@ -45,11 +45,14 @@
        private LinkedList messagesWaitingForAck;
        //Nb of listeners for this Queue
        int listeners;
  +     //Counter used to number incomming messages. (Used to order the messages.)
  +     long messageIdCounter = Long.MIN_VALUE;
   
  -        // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
  -        private boolean useRoundRobinMessageDistribution = true;
  -        // Keeps track of the last used connection so that we can do round robin 
distribution of p2p messages.
  -        private SpyDistributedConnection lastUsedConnection;
  +     // Should we use the round robin aproach to pick the next reciver of a p2p 
message?
  +     private boolean useRoundRobinMessageDistribution = true;
  +     // Keeps track of the last used connection so that we can do round robin 
distribution of p2p messages.
  +     private SpyDistributedConnection lastUsedConnection;
  +
        
        // Constructor ---------------------------------------------------
           
  @@ -57,7 +60,7 @@
        {
                destination=dest;
                subscribers=new HashMap();
  -             messages=new LinkedList();
  +             messages=new TreeSet();
                threadWorking=false;
                alreadyInTaskQueue=false;
                temporaryDestination=temporary;
  @@ -102,22 +105,10 @@
                synchronized (messages) 
                {                       
                        //Add the message to the queue
  -
  -                     //Get the priority
  -                     int pri=mes.getJMSPriority();                   
  -                     
  -                     if (pri<=4) {                           
  -                             //normal priority message
  -                             messages.addLast(mes);                          
  -                     } else {                                
  -                             //expedited priority message
  -                             int size=messages.size();
  -                             int i=0;                                
  -                             for(;i<size;i++) {
  -                                     if 
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
  -                             }                               
  -                             messages.add(i,mes);
  -                     }
  +                     //messages is now an ordered tree. The order depends 
  +                     //first on the priority and then on messageId
  +                     mes.messageId = messageIdCounter++;
  +                     messages.add(mes);                              
                        
                        if (isTopic) {
                                //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
  @@ -156,7 +147,9 @@
                        alreadyInTaskQueue=false;
   
                        if (messages.size()==0) return null;
  -                     return (SpyMessage)messages.removeFirst();
  +                     SpyMessage m = (SpyMessage)messages.first();
  +                     messages.remove(m);
  +                     return m;
                }
        }
        
  @@ -246,79 +239,82 @@
                
                //remove this connection from the list
                removeSubscriber(dc,i);
  +     }
  +             
  +     /**
  +      * Get a SpyDistributedConnection object that is listening 
  +      * to this queue.  If multiple objects are listening to the queue
  +      * this multiple calls to this method will cycle through them in a round 
  +      * robin fasion.
  +      */
  +     private SpyDistributedConnection pickNextRoundRobinConnection() {
  +
  +             // No valid next connection will exist, return null
  +             if (listeners == 0)
  +                     return null;
  +
  +             Iterator i = subscribers.values().iterator();
  +             SpyDistributedConnection firstFoundConnection = null;
  +             boolean enableSelectNext = false;
  +
  +             while (i.hasNext()) {
  +                     SpyDistributedConnection t = (SpyDistributedConnection) 
i.next();
  +
  +                     // Select the next valid connection if we are past the last 
used connection
  +                     if (t == lastUsedConnection || lastUsedConnection == null)
  +                             enableSelectNext = true;
  +
  +                     // Test to see if the connection is valid pick
  +                     if (t.listeners) {
  +                             // Store the first valid connection since the last 
used might be the last
  +                             // in the list 
  +                             if (firstFoundConnection == null)
  +                                     firstFoundConnection = t;
  +
  +                             // Are we past the last used? then we have the next 
item in the round robin  
  +                             if (enableSelectNext && t != lastUsedConnection) {
  +                                     lastUsedConnection = t;
  +                                     return t;
  +                             }
  +                     }
  +             }
  +
  +             // We got here because we did not find a valid item in the list after 
the last
  +             // used item, so lest use the first valid item 
  +             if (firstFoundConnection != null) {
  +                     lastUsedConnection = firstFoundConnection;
  +                     return firstFoundConnection;
  +             } else {
  +                     Log.error("FIXME: The listeners count was invalid !");
  +                     return null;
  +             }
        }
  -        
  -        /**
  -         * Get a SpyDistributedConnection object that is listening 
  -         * to this queue.  If multiple objects are listening to the queue
  -         * this multiple calls to this method will cycle through them in a round 
  -         * robin fasion.
  -         */
  -        private SpyDistributedConnection pickNextRoundRobinConnection() {           
 
  -             
  -            // No valid next connection will exist, return null
  -            if (listeners==0) return null;
  -            
  -            Iterator i=subscribers.values().iterator();
  -            SpyDistributedConnection firstFoundConnection=null;            
  -            boolean enableSelectNext = false;
  -
  -            while (i.hasNext()) {
  -                SpyDistributedConnection  t =(SpyDistributedConnection)i.next();    
                                 
  -
  -             // Select the next valid connection if we are past the last used 
connection
  -             if( t == lastUsedConnection || lastUsedConnection == null )
  -                     enableSelectNext = true;
  -
  -             // Test to see if the connection is valid pick
  -                if (t.listeners) {
  -                 // Store the first valid connection since the last used might be 
the last
  -                    // in the list 
  -                    if( firstFoundConnection == null )
  -                        firstFoundConnection = t;
  -
  -                    // Are we past the last used? then we have the next item in the 
round robin  
  -                    if( enableSelectNext && t!=lastUsedConnection ) {
  -                        lastUsedConnection = t;
  -                        return t;
  -                    }
  -                }
  -            }
  -            // We got here because we did not find a valid item in the list after 
the last
  -            // used item, so lest use the first valid item 
  -            if( firstFoundConnection != null ) {
  -                lastUsedConnection = firstFoundConnection;
  -                return firstFoundConnection;
  -            } else {
  -                Log.error("FIXME: The listeners count was invalid !");
  -                return null;
  -            }
  -        }
  -
  -        /**
  -         * Get a SpyDistributedConnection object that is listening 
  -         * to this queue.  Picks the first one it can find.
  -         */
  -        private SpyDistributedConnection pickFirstFoundConnection() {            
  -             
  -            // No valid next connection will exist, return null
  -            if (listeners==0) return null;
  -            
  -            Iterator i=subscribers.values().iterator();
  -            while (i.hasNext()) {
  -                SpyDistributedConnection  t =(SpyDistributedConnection)i.next();    
                                 
  -
  -             // Test to see if the connection is valid pick
  -                if (t.listeners) {
  -                    return t;
  -                }
  -            }
  -            
  -            // We got here because we did not find a valid item in the list.
  -            Log.error("FIXME: The listeners count was invalid !");
  -            return null;
  -        }
  -        
  +
  +     /**
  +      * Get a SpyDistributedConnection object that is listening 
  +      * to this queue.  Picks the first one it can find.
  +      */
  +     private SpyDistributedConnection pickFirstFoundConnection() {
  +
  +             // No valid next connection will exist, return null
  +             if (listeners == 0)
  +                     return null;
  +
  +             Iterator i = subscribers.values().iterator();
  +             while (i.hasNext()) {
  +                     SpyDistributedConnection t = (SpyDistributedConnection) 
i.next();
  +
  +                     // Test to see if the connection is valid pick
  +                     if (t.listeners) {
  +                             return t;
  +                     }
  +             }
  +
  +             // We got here because we did not find a valid item in the list.
  +             Log.error("FIXME: The listeners count was invalid !");
  +             return null;
  +     }
  +             
        void doMyJob() throws JMSException 
        {                       
                if (isTopic) {                  
  @@ -357,19 +353,19 @@
                                        
                                        Log.log("get a receiver");
                                        
  -                                        // we may have to restore the 
lastUsedConnection
  -                                        // if message on the queue is not sent. (we 
don't want to skip 
  -                                        // destination in the round robin)
  +                                     // we may have to restore the 
lastUsedConnection
  +                                     // if message on the queue is not sent. (we 
don't want to skip 
  +                                     // destination in the round robin)
                                        SpyDistributedConnection 
saveLastConnection=lastUsedConnection;
                                        SpyDistributedConnection dc;
  -                                        
  -                                        if( useRoundRobinMessageDistribution ) {
  -                                            dc=pickNextRoundRobinConnection();
  -                                        } else {
  -                                            dc=pickFirstFoundConnection();
  -                                        }
  -                                        if ( dc == null ) break;
  -                                        
  +                                                                             
  +                                                                             if( 
useRoundRobinMessageDistribution ) {
  +                                                                                    
 dc=pickNextRoundRobinConnection();
  +                                                                             } else 
{
  +                                                                                    
 dc=pickFirstFoundConnection();
  +                                                                             }
  +                                                                             if ( 
dc == null ) break;
  +                                                                             
                                        //Get the message ( if there is one message 
pending )
                                        SpyMessage mes=startWorkQueue();
                                        if (mes==null) {
  @@ -390,7 +386,9 @@
                                                Log.log("There was no receiver for the 
client "+dc);
   
                                                try {
  -                                                     addMessage(mes);
  +                                                     synchronized (messages) {
  +                                                             messages.add(mes);     
 
  +                                                     }
                                                        connectionListening(false,dc);
                                                } catch (Exception e2) {
                                                        Log.error(e2);
  @@ -400,10 +398,11 @@
                                                  throw e;
                                        } catch (Exception e) {
                                                //This is a transport failure. We 
should define our own Transport Failure class
  -                                             //for a better execption catching
  -                                                                                    
   
  +                                             //for a better execption catching      
                                                                                   
                                                try {
  -                                                     addMessage(mes);
  +                                                     synchronized (messages) {
  +                                                             messages.add(mes);     
 
  +                                                     }
                                                } catch (Exception e2) {
                                                        Log.error(e2);
                                                }
  @@ -426,7 +425,9 @@
        {
                synchronized (messages) {
                        if (messages.size()==0) return null;
  -                     return (SpyMessage)messages.removeFirst();
  +                     SpyMessage m = (SpyMessage)messages.first();
  +                     messages.remove(m);
  +                     return m;
                }
        }
                
  @@ -463,6 +464,5 @@
                
                        Log.log("Listeners for "+destination+" = "+listeners);
                }               
  -     }
  -     
  +     }       
   }
  
  
  
  1.6       +66 -54    spyderMQ/src/java/org/spydermq/SpyMessage.java
  
  Index: SpyMessage.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyMessage.java   2000/07/07 22:37:21     1.5
  +++ SpyMessage.java   2000/11/17 17:30:27     1.6
  @@ -16,22 +16,16 @@
   import java.util.Hashtable;
   import java.io.Serializable;
   
  -/**
  - *   This class implements javax.jms.Message
  - *      
  - *   @author Norbert Lataille ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.5 $
  - */
  +import java.lang.Comparable;
   public class SpyMessage 
  -     implements Serializable, Cloneable, Message
  +     implements Serializable, Cloneable, Message, Comparable
   {
   
        // Constants -----------------------------------------------------
            
        static final int DEFAULT_DELIVERY_MODE = -1;
  -    static final int DEFAULT_PRIORITY = -1;
  -    static final int DEFAULT_TIME_TO_LIVE = -1;
  +     static final int DEFAULT_PRIORITY = -1;
  +     static final int DEFAULT_TIME_TO_LIVE = -1;
   
        // Attributes ----------------------------------------------------
   
  @@ -87,129 +81,129 @@
                return jmsMessageID;
        }
        
  -    public void setJMSMessageID(String id) throws JMSException
  +     public void setJMSMessageID(String id) throws JMSException
        {
                jmsMessageID=id;
        }
        
  -    public long getJMSTimestamp() throws JMSException
  +     public long getJMSTimestamp() throws JMSException
        {
                return jmsTimeStamp;
        }
        
  -    public void setJMSTimestamp(long timestamp) throws JMSException
  +     public void setJMSTimestamp(long timestamp) throws JMSException
        {
                jmsTimeStamp=timestamp;
        }
        
  -    public byte [] getJMSCorrelationIDAsBytes() throws JMSException
  +     public byte [] getJMSCorrelationIDAsBytes() throws JMSException
        {       
                if (jmsCorrelationID) throw new JMSException("JMSCorrelationID is a 
string");
                return jmsCorrelationIDbyte;
        }
        
  -    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
  +     public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws 
JMSException
        {
                jmsCorrelationID=false;                         
                jmsCorrelationIDbyte=(byte[])correlationID.clone();
                jmsCorrelationIDString=null;
        }
        
  -    public void setJMSCorrelationID(String correlationID) throws JMSException
  +     public void setJMSCorrelationID(String correlationID) throws JMSException
        {
                jmsCorrelationID=true;
                jmsCorrelationIDString=correlationID;
                jmsCorrelationIDbyte=null;
        }
        
  -    public String getJMSCorrelationID() throws JMSException
  +     public String getJMSCorrelationID() throws JMSException
        {
                if (!jmsCorrelationID) throw new JMSException("JMSCorrelationID is an 
array");
                return jmsCorrelationIDString;
        }
        
  -    public Destination getJMSReplyTo() throws JMSException
  +     public Destination getJMSReplyTo() throws JMSException
        {
                return jmsReplyTo;
        }
        
  -    public void setJMSReplyTo(Destination replyTo) throws JMSException
  +     public void setJMSReplyTo(Destination replyTo) throws JMSException
        {
                jmsReplyTo=replyTo;
        }
        
  -    public Destination getJMSDestination() throws JMSException
  +     public Destination getJMSDestination() throws JMSException
        {
                return jmsDestination;
        }
        
  -    public void setJMSDestination(Destination destination) throws JMSException
  +     public void setJMSDestination(Destination destination) throws JMSException
        {
                jmsDestination=destination;
        }
        
  -    public int getJMSDeliveryMode() throws JMSException
  +     public int getJMSDeliveryMode() throws JMSException
        {
                return jmsDeliveryMode;
        }
        
  -    public void setJMSDeliveryMode(int deliveryMode) throws JMSException
  +     public void setJMSDeliveryMode(int deliveryMode) throws JMSException
        {
                jmsDeliveryMode=deliveryMode;
        }
        
  -    public boolean getJMSRedelivered() throws JMSException
  +     public boolean getJMSRedelivered() throws JMSException
        {               
                return jmsRedelivered;
        }
        
  -    public void setJMSRedelivered(boolean redelivered) throws JMSException
  +     public void setJMSRedelivered(boolean redelivered) throws JMSException
        {
                jmsRedelivered=redelivered;
        }
        
  -    public String getJMSType() throws JMSException
  +     public String getJMSType() throws JMSException
        {               
                return jmsType;
        }
        
  -    public void setJMSType(String type) throws JMSException
  +     public void setJMSType(String type) throws JMSException
        {
                jmsType=type;
        }
        
  -    public long getJMSExpiration() throws JMSException
  +     public long getJMSExpiration() throws JMSException
        {               
                return jmsExpiration;
        }
        
  -    public void setJMSExpiration(long expiration) throws JMSException
  +     public void setJMSExpiration(long expiration) throws JMSException
        {
                jmsExpiration=expiration;
        }
        
  -    public int getJMSPriority() throws JMSException
  +     public int getJMSPriority() throws JMSException
        {
                return jmsPriority;
        }
        
  -    public void setJMSPriority(int priority) throws JMSException
  +     public void setJMSPriority(int priority) throws JMSException
        {
                jmsPriority=priority;
        }
        
  -    public void clearProperties() throws JMSException
  +     public void clearProperties() throws JMSException
        {
                prop=new Hashtable();
                propReadWrite=true;
        }
        
  -    public boolean propertyExists(String name) throws JMSException
  +     public boolean propertyExists(String name) throws JMSException
        {
                return prop.containsKey(name);
        }
        
  -    public boolean getBooleanProperty(String name) throws JMSException
  +     public boolean getBooleanProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -219,7 +213,7 @@
                else throw new MessageFormatException("Invalid conversion");           
 
        }
        
  -    public byte getByteProperty(String name) throws JMSException
  +     public byte getByteProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -229,7 +223,7 @@
                else throw new MessageFormatException("Invalid conversion");    
        }
        
  -    public short getShortProperty(String name) throws JMSException
  +     public short getShortProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -240,7 +234,7 @@
                else throw new MessageFormatException("Invalid conversion");           
 
        }
        
  -    public int getIntProperty(String name) throws JMSException
  +     public int getIntProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -252,7 +246,7 @@
                else throw new MessageFormatException("Invalid conversion");
        }
        
  -    public long getLongProperty(String name) throws JMSException
  +     public long getLongProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -265,7 +259,7 @@
                else throw new MessageFormatException("Invalid conversion");
        }
        
  -    public float getFloatProperty(String name) throws JMSException
  +     public float getFloatProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -275,7 +269,7 @@
                else throw new MessageFormatException("Invalid conversion");
        }
        
  -    public double getDoubleProperty(String name) throws JMSException
  +     public double getDoubleProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) throw new NullPointerException();
  @@ -286,7 +280,7 @@
                else throw new MessageFormatException("Invalid conversion");
        }
        
  -    public String getStringProperty(String name) throws JMSException
  +     public String getStringProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                if (value==null) return null;
  @@ -302,13 +296,13 @@
                else throw new MessageFormatException("Invalid conversion");
        }
        
  -    public Object getObjectProperty(String name) throws JMSException
  +     public Object getObjectProperty(String name) throws JMSException
        {
                Object value=prop.get(name);
                return value;
        }
        
  -    public Enumeration getPropertyNames() throws JMSException
  +     public Enumeration getPropertyNames() throws JMSException
        {
                return prop.keys();
        }
  @@ -327,63 +321,63 @@
                
        }
        
  -    public void setBooleanProperty(String name, boolean value) throws JMSException
  +     public void setBooleanProperty(String name, boolean value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Boolean(value));
        }
        
  -    public void setByteProperty(String name, byte value) throws JMSException
  +     public void setByteProperty(String name, byte value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Byte(value));
        }
        
  -    public void setShortProperty(String name, short value) throws JMSException
  +     public void setShortProperty(String name, short value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Short(value));
        }
        
  -    public void setIntProperty(String name, int value) throws JMSException
  +     public void setIntProperty(String name, int value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Integer(value));
        }
        
  -    public void setLongProperty(String name, long value) throws JMSException
  +     public void setLongProperty(String name, long value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Long(value));
        }
        
  -    public void setFloatProperty(String name, float value) throws JMSException
  +     public void setFloatProperty(String name, float value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Float(value));
        }
        
  -    public void setDoubleProperty(String name, double value) throws JMSException
  +     public void setDoubleProperty(String name, double value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new Double(value));
        }
        
  -    public void setStringProperty(String name, String value) throws JMSException
  +     public void setStringProperty(String name, String value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
                prop.put(name,new String(value));
        }
        
  -    public void setObjectProperty(String name, Object value) throws JMSException
  +     public void setObjectProperty(String name, Object value) throws JMSException
        {
                CheckPropertyName(name);
                if (!propReadWrite) throw new MessageNotWriteableException("Properties 
are read-only");
  @@ -399,7 +393,7 @@
                else throw new MessageFormatException("Invalid object type");          
                 
        }
        
  -    public void clearBody() throws JMSException
  +     public void clearBody() throws JMSException
        {
                //Inherited classes clear their content here
                msgReadOnly=false;
  @@ -442,4 +436,22 @@
                return jmsExpiration<ts;
        }
                        
  -}
  +     //For ordering in the JMSServerQueue
  +     public transient long messageId;        /**
  +      * Return a negative number if this message should be sent
  +      * before the o message. Return a positive if should be sent
  +      * after the o message.
  +      */
  +     public int compareTo(Object o) {
  +             
  +             SpyMessage sm = (SpyMessage)o;
  +             
  +             if( jmsPriority > sm.jmsPriority) {
  +                     return -1;
  +             }
  +             if( jmsPriority < sm.jmsPriority ) {
  +                     return 1;
  +             }
  +             return (int)(messageId - sm.messageId);
  +             
  +     }      }
  
  
  

Reply via email to