User: hiram
Date: 00/11/17 09:30:28
Modified: src/java/org/spydermq JMSServerQueue.java SpyMessage.java
Log:
Sometimes message would get sent out of order. This would
happen whenever there was an error send the message (like
a NoReceiverException). The errored message would get
put at the back of the queue. Fixed that and added better support
for priority messages (for example: a priorty 2 message now
jumps ahead of a priorty 1 message.
Revision Changes Path
1.16 +118 -118 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- JMSServerQueue.java 2000/11/14 05:54:08 1.15
+++ JMSServerQueue.java 2000/11/17 17:30:27 1.16
@@ -12,16 +12,16 @@
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.HashMap;
+import java.util.TreeSet;
/**
- * This class is a message queue which is stored (hashed by Destination) on the
JMS provider
+ *This class is a message queue which is stored (hashed by Destination) on the JMS
provider
*
- * @author Norbert Lataille ([EMAIL PROTECTED])
+ *@author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.15 $
+ *@version $Revision: 1.16 $
*/
-public class JMSServerQueue
-{
+public class JMSServerQueue {
// Attributes ----------------------------------------------------
//the Destination of this queue
@@ -29,7 +29,7 @@
//DistributedConnection objs that have "registered" to this Destination
private HashMap subscribers;
//List of Pending messages
- private LinkedList messages;
+ private TreeSet messages;
//Is a thread already working on this queue ?
//You cannot start two threads on the same destination (correct order of msgs)
private boolean threadWorking;
@@ -45,11 +45,14 @@
private LinkedList messagesWaitingForAck;
//Nb of listeners for this Queue
int listeners;
+ //Counter used to number incomming messages. (Used to order the messages.)
+ long messageIdCounter = Long.MIN_VALUE;
- // Should we use the round robin aproach to pick the next reciver of a p2p
message?
- private boolean useRoundRobinMessageDistribution = true;
- // Keeps track of the last used connection so that we can do round robin
distribution of p2p messages.
- private SpyDistributedConnection lastUsedConnection;
+ // Should we use the round robin aproach to pick the next reciver of a p2p
message?
+ private boolean useRoundRobinMessageDistribution = true;
+ // Keeps track of the last used connection so that we can do round robin
distribution of p2p messages.
+ private SpyDistributedConnection lastUsedConnection;
+
// Constructor ---------------------------------------------------
@@ -57,7 +60,7 @@
{
destination=dest;
subscribers=new HashMap();
- messages=new LinkedList();
+ messages=new TreeSet();
threadWorking=false;
alreadyInTaskQueue=false;
temporaryDestination=temporary;
@@ -102,22 +105,10 @@
synchronized (messages)
{
//Add the message to the queue
-
- //Get the priority
- int pri=mes.getJMSPriority();
-
- if (pri<=4) {
- //normal priority message
- messages.addLast(mes);
- } else {
- //expedited priority message
- int size=messages.size();
- int i=0;
- for(;i<size;i++) {
- if
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
- }
- messages.add(i,mes);
- }
+ //messages is now an ordered tree. The order depends
+ //first on the priority and then on messageId
+ mes.messageId = messageIdCounter++;
+ messages.add(mes);
if (isTopic) {
//if a thread is already working on this destination,
I don't have to myself to the taskqueue
@@ -156,7 +147,9 @@
alreadyInTaskQueue=false;
if (messages.size()==0) return null;
- return (SpyMessage)messages.removeFirst();
+ SpyMessage m = (SpyMessage)messages.first();
+ messages.remove(m);
+ return m;
}
}
@@ -246,79 +239,82 @@
//remove this connection from the list
removeSubscriber(dc,i);
+ }
+
+ /**
+ * Get a SpyDistributedConnection object that is listening
+ * to this queue. If multiple objects are listening to the queue
+ * this multiple calls to this method will cycle through them in a round
+ * robin fasion.
+ */
+ private SpyDistributedConnection pickNextRoundRobinConnection() {
+
+ // No valid next connection will exist, return null
+ if (listeners == 0)
+ return null;
+
+ Iterator i = subscribers.values().iterator();
+ SpyDistributedConnection firstFoundConnection = null;
+ boolean enableSelectNext = false;
+
+ while (i.hasNext()) {
+ SpyDistributedConnection t = (SpyDistributedConnection)
i.next();
+
+ // Select the next valid connection if we are past the last
used connection
+ if (t == lastUsedConnection || lastUsedConnection == null)
+ enableSelectNext = true;
+
+ // Test to see if the connection is valid pick
+ if (t.listeners) {
+ // Store the first valid connection since the last
used might be the last
+ // in the list
+ if (firstFoundConnection == null)
+ firstFoundConnection = t;
+
+ // Are we past the last used? then we have the next
item in the round robin
+ if (enableSelectNext && t != lastUsedConnection) {
+ lastUsedConnection = t;
+ return t;
+ }
+ }
+ }
+
+ // We got here because we did not find a valid item in the list after
the last
+ // used item, so lest use the first valid item
+ if (firstFoundConnection != null) {
+ lastUsedConnection = firstFoundConnection;
+ return firstFoundConnection;
+ } else {
+ Log.error("FIXME: The listeners count was invalid !");
+ return null;
+ }
}
-
- /**
- * Get a SpyDistributedConnection object that is listening
- * to this queue. If multiple objects are listening to the queue
- * this multiple calls to this method will cycle through them in a round
- * robin fasion.
- */
- private SpyDistributedConnection pickNextRoundRobinConnection() {
-
- // No valid next connection will exist, return null
- if (listeners==0) return null;
-
- Iterator i=subscribers.values().iterator();
- SpyDistributedConnection firstFoundConnection=null;
- boolean enableSelectNext = false;
-
- while (i.hasNext()) {
- SpyDistributedConnection t =(SpyDistributedConnection)i.next();
-
- // Select the next valid connection if we are past the last used
connection
- if( t == lastUsedConnection || lastUsedConnection == null )
- enableSelectNext = true;
-
- // Test to see if the connection is valid pick
- if (t.listeners) {
- // Store the first valid connection since the last used might be
the last
- // in the list
- if( firstFoundConnection == null )
- firstFoundConnection = t;
-
- // Are we past the last used? then we have the next item in the
round robin
- if( enableSelectNext && t!=lastUsedConnection ) {
- lastUsedConnection = t;
- return t;
- }
- }
- }
- // We got here because we did not find a valid item in the list after
the last
- // used item, so lest use the first valid item
- if( firstFoundConnection != null ) {
- lastUsedConnection = firstFoundConnection;
- return firstFoundConnection;
- } else {
- Log.error("FIXME: The listeners count was invalid !");
- return null;
- }
- }
-
- /**
- * Get a SpyDistributedConnection object that is listening
- * to this queue. Picks the first one it can find.
- */
- private SpyDistributedConnection pickFirstFoundConnection() {
-
- // No valid next connection will exist, return null
- if (listeners==0) return null;
-
- Iterator i=subscribers.values().iterator();
- while (i.hasNext()) {
- SpyDistributedConnection t =(SpyDistributedConnection)i.next();
-
- // Test to see if the connection is valid pick
- if (t.listeners) {
- return t;
- }
- }
-
- // We got here because we did not find a valid item in the list.
- Log.error("FIXME: The listeners count was invalid !");
- return null;
- }
-
+
+ /**
+ * Get a SpyDistributedConnection object that is listening
+ * to this queue. Picks the first one it can find.
+ */
+ private SpyDistributedConnection pickFirstFoundConnection() {
+
+ // No valid next connection will exist, return null
+ if (listeners == 0)
+ return null;
+
+ Iterator i = subscribers.values().iterator();
+ while (i.hasNext()) {
+ SpyDistributedConnection t = (SpyDistributedConnection)
i.next();
+
+ // Test to see if the connection is valid pick
+ if (t.listeners) {
+ return t;
+ }
+ }
+
+ // We got here because we did not find a valid item in the list.
+ Log.error("FIXME: The listeners count was invalid !");
+ return null;
+ }
+
void doMyJob() throws JMSException
{
if (isTopic) {
@@ -357,19 +353,19 @@
Log.log("get a receiver");
- // we may have to restore the
lastUsedConnection
- // if message on the queue is not sent. (we
don't want to skip
- // destination in the round robin)
+ // we may have to restore the
lastUsedConnection
+ // if message on the queue is not sent. (we
don't want to skip
+ // destination in the round robin)
SpyDistributedConnection
saveLastConnection=lastUsedConnection;
SpyDistributedConnection dc;
-
- if( useRoundRobinMessageDistribution ) {
- dc=pickNextRoundRobinConnection();
- } else {
- dc=pickFirstFoundConnection();
- }
- if ( dc == null ) break;
-
+
+ if(
useRoundRobinMessageDistribution ) {
+
dc=pickNextRoundRobinConnection();
+ } else
{
+
dc=pickFirstFoundConnection();
+ }
+ if (
dc == null ) break;
+
//Get the message ( if there is one message
pending )
SpyMessage mes=startWorkQueue();
if (mes==null) {
@@ -390,7 +386,9 @@
Log.log("There was no receiver for the
client "+dc);
try {
- addMessage(mes);
+ synchronized (messages) {
+ messages.add(mes);
+ }
connectionListening(false,dc);
} catch (Exception e2) {
Log.error(e2);
@@ -400,10 +398,11 @@
throw e;
} catch (Exception e) {
//This is a transport failure. We
should define our own Transport Failure class
- //for a better execption catching
-
+ //for a better execption catching
try {
- addMessage(mes);
+ synchronized (messages) {
+ messages.add(mes);
+ }
} catch (Exception e2) {
Log.error(e2);
}
@@ -426,7 +425,9 @@
{
synchronized (messages) {
if (messages.size()==0) return null;
- return (SpyMessage)messages.removeFirst();
+ SpyMessage m = (SpyMessage)messages.first();
+ messages.remove(m);
+ return m;
}
}
@@ -463,6 +464,5 @@
Log.log("Listeners for "+destination+" = "+listeners);
}
- }
-
+ }
}
1.6 +66 -54 spyderMQ/src/java/org/spydermq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SpyMessage.java 2000/07/07 22:37:21 1.5
+++ SpyMessage.java 2000/11/17 17:30:27 1.6
@@ -16,22 +16,16 @@
import java.util.Hashtable;
import java.io.Serializable;
-/**
- * This class implements javax.jms.Message
- *
- * @author Norbert Lataille ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.5 $
- */
+import java.lang.Comparable;
public class SpyMessage
- implements Serializable, Cloneable, Message
+ implements Serializable, Cloneable, Message, Comparable
{
// Constants -----------------------------------------------------
static final int DEFAULT_DELIVERY_MODE = -1;
- static final int DEFAULT_PRIORITY = -1;
- static final int DEFAULT_TIME_TO_LIVE = -1;
+ static final int DEFAULT_PRIORITY = -1;
+ static final int DEFAULT_TIME_TO_LIVE = -1;
// Attributes ----------------------------------------------------
@@ -87,129 +81,129 @@
return jmsMessageID;
}
- public void setJMSMessageID(String id) throws JMSException
+ public void setJMSMessageID(String id) throws JMSException
{
jmsMessageID=id;
}
- public long getJMSTimestamp() throws JMSException
+ public long getJMSTimestamp() throws JMSException
{
return jmsTimeStamp;
}
- public void setJMSTimestamp(long timestamp) throws JMSException
+ public void setJMSTimestamp(long timestamp) throws JMSException
{
jmsTimeStamp=timestamp;
}
- public byte [] getJMSCorrelationIDAsBytes() throws JMSException
+ public byte [] getJMSCorrelationIDAsBytes() throws JMSException
{
if (jmsCorrelationID) throw new JMSException("JMSCorrelationID is a
string");
return jmsCorrelationIDbyte;
}
- public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
+ public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws
JMSException
{
jmsCorrelationID=false;
jmsCorrelationIDbyte=(byte[])correlationID.clone();
jmsCorrelationIDString=null;
}
- public void setJMSCorrelationID(String correlationID) throws JMSException
+ public void setJMSCorrelationID(String correlationID) throws JMSException
{
jmsCorrelationID=true;
jmsCorrelationIDString=correlationID;
jmsCorrelationIDbyte=null;
}
- public String getJMSCorrelationID() throws JMSException
+ public String getJMSCorrelationID() throws JMSException
{
if (!jmsCorrelationID) throw new JMSException("JMSCorrelationID is an
array");
return jmsCorrelationIDString;
}
- public Destination getJMSReplyTo() throws JMSException
+ public Destination getJMSReplyTo() throws JMSException
{
return jmsReplyTo;
}
- public void setJMSReplyTo(Destination replyTo) throws JMSException
+ public void setJMSReplyTo(Destination replyTo) throws JMSException
{
jmsReplyTo=replyTo;
}
- public Destination getJMSDestination() throws JMSException
+ public Destination getJMSDestination() throws JMSException
{
return jmsDestination;
}
- public void setJMSDestination(Destination destination) throws JMSException
+ public void setJMSDestination(Destination destination) throws JMSException
{
jmsDestination=destination;
}
- public int getJMSDeliveryMode() throws JMSException
+ public int getJMSDeliveryMode() throws JMSException
{
return jmsDeliveryMode;
}
- public void setJMSDeliveryMode(int deliveryMode) throws JMSException
+ public void setJMSDeliveryMode(int deliveryMode) throws JMSException
{
jmsDeliveryMode=deliveryMode;
}
- public boolean getJMSRedelivered() throws JMSException
+ public boolean getJMSRedelivered() throws JMSException
{
return jmsRedelivered;
}
- public void setJMSRedelivered(boolean redelivered) throws JMSException
+ public void setJMSRedelivered(boolean redelivered) throws JMSException
{
jmsRedelivered=redelivered;
}
- public String getJMSType() throws JMSException
+ public String getJMSType() throws JMSException
{
return jmsType;
}
- public void setJMSType(String type) throws JMSException
+ public void setJMSType(String type) throws JMSException
{
jmsType=type;
}
- public long getJMSExpiration() throws JMSException
+ public long getJMSExpiration() throws JMSException
{
return jmsExpiration;
}
- public void setJMSExpiration(long expiration) throws JMSException
+ public void setJMSExpiration(long expiration) throws JMSException
{
jmsExpiration=expiration;
}
- public int getJMSPriority() throws JMSException
+ public int getJMSPriority() throws JMSException
{
return jmsPriority;
}
- public void setJMSPriority(int priority) throws JMSException
+ public void setJMSPriority(int priority) throws JMSException
{
jmsPriority=priority;
}
- public void clearProperties() throws JMSException
+ public void clearProperties() throws JMSException
{
prop=new Hashtable();
propReadWrite=true;
}
- public boolean propertyExists(String name) throws JMSException
+ public boolean propertyExists(String name) throws JMSException
{
return prop.containsKey(name);
}
- public boolean getBooleanProperty(String name) throws JMSException
+ public boolean getBooleanProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -219,7 +213,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public byte getByteProperty(String name) throws JMSException
+ public byte getByteProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -229,7 +223,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public short getShortProperty(String name) throws JMSException
+ public short getShortProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -240,7 +234,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public int getIntProperty(String name) throws JMSException
+ public int getIntProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -252,7 +246,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public long getLongProperty(String name) throws JMSException
+ public long getLongProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -265,7 +259,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public float getFloatProperty(String name) throws JMSException
+ public float getFloatProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -275,7 +269,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public double getDoubleProperty(String name) throws JMSException
+ public double getDoubleProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) throw new NullPointerException();
@@ -286,7 +280,7 @@
else throw new MessageFormatException("Invalid conversion");
}
- public String getStringProperty(String name) throws JMSException
+ public String getStringProperty(String name) throws JMSException
{
Object value=prop.get(name);
if (value==null) return null;
@@ -302,13 +296,13 @@
else throw new MessageFormatException("Invalid conversion");
}
- public Object getObjectProperty(String name) throws JMSException
+ public Object getObjectProperty(String name) throws JMSException
{
Object value=prop.get(name);
return value;
}
- public Enumeration getPropertyNames() throws JMSException
+ public Enumeration getPropertyNames() throws JMSException
{
return prop.keys();
}
@@ -327,63 +321,63 @@
}
- public void setBooleanProperty(String name, boolean value) throws JMSException
+ public void setBooleanProperty(String name, boolean value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Boolean(value));
}
- public void setByteProperty(String name, byte value) throws JMSException
+ public void setByteProperty(String name, byte value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Byte(value));
}
- public void setShortProperty(String name, short value) throws JMSException
+ public void setShortProperty(String name, short value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Short(value));
}
- public void setIntProperty(String name, int value) throws JMSException
+ public void setIntProperty(String name, int value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Integer(value));
}
- public void setLongProperty(String name, long value) throws JMSException
+ public void setLongProperty(String name, long value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Long(value));
}
- public void setFloatProperty(String name, float value) throws JMSException
+ public void setFloatProperty(String name, float value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Float(value));
}
- public void setDoubleProperty(String name, double value) throws JMSException
+ public void setDoubleProperty(String name, double value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new Double(value));
}
- public void setStringProperty(String name, String value) throws JMSException
+ public void setStringProperty(String name, String value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
prop.put(name,new String(value));
}
- public void setObjectProperty(String name, Object value) throws JMSException
+ public void setObjectProperty(String name, Object value) throws JMSException
{
CheckPropertyName(name);
if (!propReadWrite) throw new MessageNotWriteableException("Properties
are read-only");
@@ -399,7 +393,7 @@
else throw new MessageFormatException("Invalid object type");
}
- public void clearBody() throws JMSException
+ public void clearBody() throws JMSException
{
//Inherited classes clear their content here
msgReadOnly=false;
@@ -442,4 +436,22 @@
return jmsExpiration<ts;
}
-}
+ //For ordering in the JMSServerQueue
+ public transient long messageId; /**
+ * Return a negative number if this message should be sent
+ * before the o message. Return a positive if should be sent
+ * after the o message.
+ */
+ public int compareTo(Object o) {
+
+ SpyMessage sm = (SpyMessage)o;
+
+ if( jmsPriority > sm.jmsPriority) {
+ return -1;
+ }
+ if( jmsPriority < sm.jmsPriority ) {
+ return 1;
+ }
+ return (int)(messageId - sm.messageId);
+
+ } }