User: hiram
Date: 00/11/19 11:59:59
Modified: src/java/org/spydermq JMSServer.java JMSServerQueue.java
SpyConnection.java SpyDistributedConnection.java
SpyMessage.java SpyMessageConsumer.java
SpyQueueReceiver.java SpyQueueSession.java
SpySession.java SpyTopicSession.java
SpyTopicSubscriber.java
Added: src/java/org/spydermq JMSServerQueueReceiver.java
SpyAcknowledgementItem.java
Removed: src/java/org/spydermq ConnectionQueue.java SessionQueue.java
Log:
Commiting several changes:
- Removed ConnectionQueue and SessionQueue. All consumer managment is done at the
SpyConnection now.
- Unacknowledged messages are maintained on the server side now.
(JMSServerQueueReceiver)
- Acknowlegment messages are sent to the server from the client.
- Optimized the OIL and UIL transports by caching the DistributedConnection on the
JMSServer when the ConnectionReciver is setup.
- Cleaned up the OIL by only using the Object(Output/Input) streams instead of both
Object(Output/Input) and Buffered(Output/Input) streams.
- A QueueReceiver now does a request for a single message on a receive() method
instead turning on/off listen to get a message.
- For the OIL and UIL, a connection failure/termination is now handled gracefully
(if connectionCLosing() was called, no errors shown, if it was not called and the
connection failed, we call it).
Revision Changes Path
1.9 +103 -87 spyderMQ/src/java/org/spydermq/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- JMSServer.java 2000/08/25 02:30:23 1.8
+++ JMSServer.java 2000/11/19 19:59:56 1.9
@@ -22,14 +22,14 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
{
// Constants -----------------------------------------------------
-
+
//number of threads in the pool (TO DO: this value should be dynamic)
final int NB_THREADS=1;
public static final String OBJECT_NAME = "JMS:service=JMSServer";
@@ -49,18 +49,18 @@
//The security manager
SecurityManager securityManager;
- /**
- * <code>true</code> when the server is running. <code>false</code> when the
- * server should stop running.
- */
- private boolean alive = true;
-
- /**
- * Because there can be a delay between killing the JMS service and the
- * service actually dying, this field is used to tell external classes
- * that that server has actually stopped.
- */
- private boolean stopped = true;
+ /**
+ * <code>true</code> when the server is running. <code>false</code> when the
+ * server should stop running.
+ */
+ private boolean alive = true;
+
+ /**
+ * Because there can be a delay between killing the JMS service and the
+ * service actually dying, this field is used to tell external classes
+ * that that server has actually stopped.
+ */
+ private boolean stopped = true;
// Constructor ---------------------------------------------------
@@ -82,19 +82,19 @@
lastTemporaryTopic=1;
this.securityManager=securityManager;
- }
+ }
- /**
- * Returns <code>false</code> if the JMS server is currently
- * running and handling requests, <code>true</code> otherwise.
- *
- * @return <code>false</code> if the JMS server is currently
- * running and handling requests, <code>true</code>
- * otherwise.
- */
- public boolean isStopped() {
- return this.stopped;
- }
+ /**
+ * Returns <code>false</code> if the JMS server is currently
+ * running and handling requests, <code>true</code> otherwise.
+ *
+ * @return <code>false</code> if the JMS server is currently
+ * running and handling requests, <code>true</code>
+ * otherwise.
+ */
+ public boolean isStopped() {
+ return this.stopped;
+ }
// Public --------------------------------------------------------
@@ -102,59 +102,58 @@
//We should let threads cycle through the JMSServerQueue list, and
synchronized on the queue they are working on.
public void run() {
- while (alive) {
- JMSServerQueue queue = null;
+ while (alive) {
+ JMSServerQueue queue = null;
- this.stopped = false;
+ this.stopped = false;
- //Wait (and sleep) until it can find something to do
- synchronized (taskQueue) {
- while (queue == null && alive) {
-
- // size() is O(1) in LinkedList...
- int size=taskQueue.size();
- if (size!=0) {
-
- //<DEBUG>
- queue = (JMSServerQueue)taskQueue.removeFirst();
- //queue=(JMSServerQueue)taskQueue.getFirst();
- //</DEBUG>
-
- //One other thread can start working on the task
queue...
- if (size > 1) {
- taskQueue.notify();
- }
- } else {
- try {
- Log.log("I'm going to bed...");
- taskQueue.wait(5000);
- Log.log("I wake up");
- } catch (InterruptedException e) {
- }
- }
-
- }
- }
-
- if (alive) {
- //Ask the queue to do its job
- try {
- queue.doMyJob();
- } catch (JMSException e) {
- Log.error(e);
- }
- }
- }
- Log.log("JMS service stopped.");
- this.stopped = true;
- }
-
- public void stopServer() {
- this.alive = false;
- }
+ //Wait (and sleep) until it can find something to do
+ synchronized (taskQueue) {
+ while (queue == null && alive) {
+
+ // size() is O(1) in LinkedList...
+ int size=taskQueue.size();
+ if (size!=0) {
+
+ //<DEBUG>
+ queue =
(JMSServerQueue)taskQueue.removeFirst();
+
//queue=(JMSServerQueue)taskQueue.getFirst();
+ //</DEBUG>
+
+ //One other thread can start
working on the task queue...
+ if (size > 1) {
+ taskQueue.notify();
+ }
+ } else {
+ try {
+ //Log.log("I'm going
to bed...");
+ taskQueue.wait(5000);
+ //Log.log("I wake up");
+ } catch (InterruptedException
e) {
+ }
+ }
+
+ }
+ }
+
+ if (alive) {
+ //Ask the queue to do its job
+ try {
+ queue.doMyJob();
+ } catch (JMSException e) {
+ Log.error(e);
+ }
+ }
+ }
+ Log.log("JMS service stopped.");
+ this.stopped = true;
+ }
- // Administration calls
-
+ public void stopServer() {
+ this.alive = false;
+ }
+
+ // Administration calls
public SpyTopic newTopic(String name) throws JMSException
{
Log.notice("[JMSServer] new topic : "+name);
@@ -216,7 +215,7 @@
}
//A connection has send a new message
- public void newMessage(SpyMessage val[],String id) throws JMSException
+ public void newMessage(SpyMessage val[],String id) throws JMSException
{//Warning, a part is in the OIL file
if (val.length!=1) Log.notice("INCOMING: "+val.length+" messages from
"+id);
@@ -230,7 +229,7 @@
//Add the message to the queue
queue.addMessage(val[i]);
}
- }
+ }
//A connection object wants to subscribe to a Destination
public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
@@ -300,6 +299,8 @@
//A connection is closing [error or notification]
public synchronized void connectionClosing(SpyDistributedConnection
dc,JMSServerQueue noCheck)
{
+ Log.log("connectionClosing(dc="+dc+",noCheck="+noCheck+")");
+
if (dc==null) return;
//unregister its clientID
@@ -350,15 +351,7 @@
securityManager.addClientID(ID);
}
- public SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
- {
- Log.log("JMSserver: queueReceiveNoWait(queue="+queue+")");
-
- JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
- if (serverQueue==null) throw new JMSException("This destination does
not exist !");
-
- return serverQueue.queueReceiveNoWait();
- }
+
public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
{
@@ -373,4 +366,27 @@
Log.error("JMSServer.finalize()");
}
+ //Sent by a client to Ack or Nack a message.
+ public void acknowledge(SpyAcknowledgementItem[] items, boolean isAck,
SpyDistributedConnection dc) throws JMSException
+ {
+
+ for( int i=0; i<items.length; i++ ) {
+
+ JMSServerQueue
serverQueue=(JMSServerQueue)messageQueue.get(items[i].jmsDestination);
+ if (serverQueue==null) throw new JMSException("Destination
does not exist: "+items[i].jmsDestination);
+
+ serverQueue.acknowledge(dc, items[i].jmsMessageID, isAck);
+ }
+ }
+
+ //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
+ public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws JMSException
+ {
+ Log.log("JMSserver: queueReceive(queue="+queue+",wait="+wait+")");
+ JMSServerQueue serverQueue=(JMSServerQueue)messageQueue.get(queue);
+ if (serverQueue==null) throw new JMSException("This destination does
not exist !");
+
+ return serverQueue.queueReceive(wait,dc);
+
+ }
}
1.17 +231 -170 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- JMSServerQueue.java 2000/11/17 17:30:27 1.16
+++ JMSServerQueue.java 2000/11/19 19:59:56 1.17
@@ -19,7 +19,7 @@
*
*@author Norbert Lataille ([EMAIL PROTECTED])
*
- *@version $Revision: 1.16 $
+ *@version $Revision: 1.17 $
*/
public class JMSServerQueue {
// Attributes ----------------------------------------------------
@@ -41,21 +41,19 @@
private JMSServer server;
//Am I a queue or a topic
boolean isTopic;
- //List of messages waiting for acknowledgment
- private LinkedList messagesWaitingForAck;
+
//Nb of listeners for this Queue
int listeners;
//Counter used to number incomming messages. (Used to order the messages.)
long messageIdCounter = Long.MIN_VALUE;
+ // Keeps track of the last used connection so that we can do round robin
distribution of p2p messages.
+ private JMSServerQueueReceiver lastUsedQueueReceiver;
// Should we use the round robin aproach to pick the next reciver of a p2p
message?
private boolean useRoundRobinMessageDistribution = true;
- // Keeps track of the last used connection so that we can do round robin
distribution of p2p messages.
- private SpyDistributedConnection lastUsedConnection;
- // Constructor ---------------------------------------------------
-
+ // Constructor ---------------------------------------------------
JMSServerQueue(SpyDestination dest,SpyDistributedConnection
temporary,JMSServer server)
{
destination=dest;
@@ -65,31 +63,38 @@
alreadyInTaskQueue=false;
temporaryDestination=temporary;
this.server=server;
- messagesWaitingForAck=new LinkedList();
isTopic=dest instanceof SpyTopic;
listeners=0;
}
+
+
- // Package protected ---------------------------------------------
-
+ // Package protected ---------------------------------------------
void addSubscriber(SpyDistributedConnection dc) throws JMSException
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
if
(temporaryDestination!=null&&!temporaryDestination.equals(dc)) throw new
JMSException("You cannot subscriber to this temporary destination");
- subscribers.put(dc.getClientID(),dc);
+
+ Object qr = subscribers.get(dc.getClientID());
+ if( qr == null ) {
+ subscribers.put(dc.getClientID(),new
JMSServerQueueReceiver(this,dc));
+ }
}
}
+
+
void removeSubscriber(SpyDistributedConnection dc,Iterator i)
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
-
- SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
- if (distributedConnection==null) return;
- if (distributedConnection.listeners) listeners--;
+ JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
+ if (qr==null) return;
+
+ qr.close();
+
if (i==null) {
if (subscribers.remove(dc.getClientID())==null)
Log.notice("WARNING: Could not remove "+dc.getClientID());
} else {
@@ -99,7 +104,9 @@
}
}
- public void addMessage(SpyMessage mes) throws JMSException
+
+
+ public void addMessage(SpyMessage mes)
{
//Add a message to the message list...
synchronized (messages)
@@ -120,6 +127,8 @@
}
}
+
+
//Clear the message queue
synchronized SpyMessage[] startWork()
{
@@ -138,7 +147,9 @@
return mes;
}
}
+
+
synchronized SpyMessage startWorkQueue()
{
synchronized (messages) {
@@ -152,7 +163,9 @@
return m;
}
}
+
+
void endWork()
{
//The thread has finished his work...
@@ -168,6 +181,8 @@
}
}
+
+
void sendOneMessage(SpyMessage mes)
{
//we can only add/remove a subscribers once the message is sent (
iterator is fail-fast )
@@ -177,18 +192,20 @@
Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
- SpyDistributedConnection
dc=(SpyDistributedConnection)i.next();
+ JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)i.next();
try {
- dc.cr.receive(destination,mes);
+ qr.sendOneMessage(mes);
} catch (Exception e) {
- Log.notice("Cannot deliver this message to the
client "+dc);
+ Log.notice("Cannot deliver this message to the
client: "+qr.dc.getClientID());
Log.notice(e);
- handleConnectionFailure(dc,i);
+ handleConnectionFailure(qr.dc,i);
}
}
}
}
+
+
void sendMultipleMessages(SpyMessage mes[])
{
synchronized (subscribers) {
@@ -197,13 +214,13 @@
Iterator i=subscribers.values().iterator();
while (i.hasNext()) {
- SpyDistributedConnection
dc=(SpyDistributedConnection)i.next();
+ JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)i.next();
try {
- dc.cr.receiveMultiple(destination,mes);
+ qr.sendMultipleMessages(mes);
} catch (Exception e) {
- Log.error("Cannot deliver those messages to
the client "+dc.getClientID());
+ Log.error("Cannot deliver those messages to
the client "+qr.dc.getClientID());
Log.error(e);
- handleConnectionFailure(dc,i);
+ handleConnectionFailure(qr.dc,i);
}
}
}
@@ -216,7 +233,9 @@
Log.notice("Warning: The DistributedConnection was still registered
for "+destination);
removeSubscriber(dc,null);
}
+
+
void notifyWorkers()
{
//It is useless to put many times the same destination in the task
queue
@@ -228,7 +247,9 @@
server.taskQueue.notify();
}
}
+
+
private void handleConnectionFailure(SpyDistributedConnection dc,Iterator i)
{
//We should try again :) This behavior should under control of a
Failure-Plugin
@@ -241,79 +262,8 @@
removeSubscriber(dc,i);
}
- /**
- * Get a SpyDistributedConnection object that is listening
- * to this queue. If multiple objects are listening to the queue
- * this multiple calls to this method will cycle through them in a round
- * robin fasion.
- */
- private SpyDistributedConnection pickNextRoundRobinConnection() {
-
- // No valid next connection will exist, return null
- if (listeners == 0)
- return null;
-
- Iterator i = subscribers.values().iterator();
- SpyDistributedConnection firstFoundConnection = null;
- boolean enableSelectNext = false;
-
- while (i.hasNext()) {
- SpyDistributedConnection t = (SpyDistributedConnection)
i.next();
-
- // Select the next valid connection if we are past the last
used connection
- if (t == lastUsedConnection || lastUsedConnection == null)
- enableSelectNext = true;
-
- // Test to see if the connection is valid pick
- if (t.listeners) {
- // Store the first valid connection since the last
used might be the last
- // in the list
- if (firstFoundConnection == null)
- firstFoundConnection = t;
-
- // Are we past the last used? then we have the next
item in the round robin
- if (enableSelectNext && t != lastUsedConnection) {
- lastUsedConnection = t;
- return t;
- }
- }
- }
- // We got here because we did not find a valid item in the list after
the last
- // used item, so lest use the first valid item
- if (firstFoundConnection != null) {
- lastUsedConnection = firstFoundConnection;
- return firstFoundConnection;
- } else {
- Log.error("FIXME: The listeners count was invalid !");
- return null;
- }
- }
- /**
- * Get a SpyDistributedConnection object that is listening
- * to this queue. Picks the first one it can find.
- */
- private SpyDistributedConnection pickFirstFoundConnection() {
-
- // No valid next connection will exist, return null
- if (listeners == 0)
- return null;
-
- Iterator i = subscribers.values().iterator();
- while (i.hasNext()) {
- SpyDistributedConnection t = (SpyDistributedConnection)
i.next();
-
- // Test to see if the connection is valid pick
- if (t.listeners) {
- return t;
- }
- }
-
- // We got here because we did not find a valid item in the list.
- Log.error("FIXME: The listeners count was invalid !");
- return null;
- }
void doMyJob() throws JMSException
{
@@ -323,23 +273,23 @@
SpyMessage[] msgs=startWork();
//Let the thread do its work
- if (msgs.length>1) {
+ if (msgs.length == 1) {
+
+ if (!msgs[0].isOutdated()) {
+ sendOneMessage(msgs[0]);
+ }
+
+ } else if (msgs.length>1) {
//We can send multiple messages
- Log.log("DISPATCH: "+msgs.length+" messages =>
"+destination);
sendMultipleMessages(msgs);
- } else {
- //Send each message
- for(int i=0;i<msgs.length;i++) {
- SpyMessage message=(SpyMessage)msgs[i];
- Log.log("DISPATCH: "+message+" =>
"+destination);
- if (!message.isOutdated())
sendOneMessage(message);
- }
}
//Notify that it has finished its work : another thread can
start working on this queue
endWork();
} else {
+
+ Log.log("Dispatching messages");
synchronized (this) {
//In the Queue case, we synchronize on [this] to avoid
changes (listening modifications)
@@ -350,66 +300,52 @@
//At first, find a receiver
//NL: We could find a better receiver (load
balancing ?)
//HC: Using Round Robin should provide some
load balancing
-
- Log.log("get a receiver");
-
- // we may have to restore the
lastUsedConnection
- // if message on the queue is not sent. (we
don't want to skip
- // destination in the round robin)
- SpyDistributedConnection
saveLastConnection=lastUsedConnection;
- SpyDistributedConnection dc;
-
- if(
useRoundRobinMessageDistribution ) {
-
dc=pickNextRoundRobinConnection();
- } else
{
-
dc=pickFirstFoundConnection();
- }
- if (
dc == null ) break;
-
+
//Get the message ( if there is one message
pending )
SpyMessage mes=startWorkQueue();
if (mes==null) {
- lastUsedConnection=saveLastConnection;
+ Log.log("Done dispatching messages: No
more message to send");
break;
}
- if (mes.isOutdated()) {
- lastUsedConnection=saveLastConnection;
+
+ if (mes.isOutdated())
continue;
+
+ // we may have to restore the
lastUsedQueueReceiver
+ // if message on the queue is not sent. (we
don't want to skip
+ // destination in the round robin)
+ JMSServerQueueReceiver qr;
+ if( useRoundRobinMessageDistribution ) {
+ qr=pickNextRoundRobinQueueReceiver();
+ } else {
+ qr=pickFirstFoundQueueReceiver();
}
-
+ if ( qr == null ) {
+ restoreMessage(mes);
+ Log.log("Done dispatching messages: No
receiver available for dispatch");
+ break;
+ }
+
//Send the message
try {
- dc.cr.receive(destination,mes);
+ qr.sendOneMessage(mes);
} catch (NoReceiverException e) {
//Log.log(e);
- Log.log("There was no receiver for the
client "+dc);
+ Log.log("Got a NoReceiverException
from: "+qr.dc.getClientID());
+ restoreMessage(mes);
+ qr.setListening(false);
- try {
- synchronized (messages) {
- messages.add(mes);
- }
- connectionListening(false,dc);
- } catch (Exception e2) {
- Log.error(e2);
- }
-
} catch (JMSException e) {
throw e;
} catch (Exception e) {
//This is a transport failure. We
should define our own Transport Failure class
- //for a better execption catching
- try {
- synchronized (messages) {
- messages.add(mes);
- }
- } catch (Exception e2) {
- Log.error(e2);
- }
+ //for a better execption catching
- Log.error("Cannot deliver this message
to the client "+dc);
+ restoreMessage(mes);
+ Log.error("Cannot deliver this message
to the client "+qr.dc.getClientID());
Log.error(e);
- handleConnectionFailure(dc,null);
+ handleConnectionFailure(qr.dc,null);
}
}
@@ -421,40 +357,20 @@
}
}
- SpyMessage queueReceiveNoWait()
- {
- synchronized (messages) {
- if (messages.size()==0) return null;
- SpyMessage m = (SpyMessage)messages.first();
- messages.remove(m);
- return m;
- }
- }
+
void connectionListening(boolean mode,SpyDistributedConnection dc) throws
JMSException
{
- // Before check
-
+ // Before check
// Synchronized code : We want to avoid sending messages while we are
changing the connection status
synchronized (this) {
- SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
- if (distributedConnection==null) throw new JMSException("This
DistributedConnection is not registered");
-
- if (mode) {
- if (!distributedConnection.listeners) {
- distributedConnection.listeners=true;
- listeners++;
- }
- } else {
- if (distributedConnection.listeners) {
- distributedConnection.listeners=false;
- listeners--;
- }
- }
+ JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
+ if (qr==null) throw new JMSException("This
DistributedConnection is not registered");
+ qr.setListening(mode);
if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
synchronized (messages) {
@@ -462,7 +378,152 @@
}
}
- Log.log("Listeners for "+destination+" = "+listeners);
}
}
+
+
+
+ public void acknowledge(SpyDistributedConnection dc, String messageId, boolean
isAck) throws JMSException {
+
+ JMSServerQueueReceiver qr =
(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
+ if( qr==null )
+ throw new JMSException("You have not subscribed to this
destination.");
+
+ qr.acknowledge(messageId, isAck);
+
+ }
+
+
+
+ /**
+ * Get a SpyDistributedConnection object that is listening
+ * to this queue. Picks the first one it can find.
+ */
+ private JMSServerQueueReceiver pickFirstFoundQueueReceiver() {
+
+ // No valid next connection will exist, return null
+ if (listeners == 0)
+ return null;
+
+ Iterator i = subscribers.values().iterator();
+ while (i.hasNext()) {
+ JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
+
+ // Test to see if the connection is valid pick
+ if (t.isListening()) {
+ return t;
+ }
+ }
+
+ // We got here because we did not find a valid item in the list.
+ Log.error("FIXME: The listeners count was invalid !");
+ return null;
+ }
+
+
+
+ /**
+ * Get a JMSServerQueueReceiver object that is listening
+ * to this queue. If multiple objects are listening to the queue
+ * this multiple calls to this method will cycle through them in a round
+ * robin fasion.
+ */
+ private JMSServerQueueReceiver pickNextRoundRobinQueueReceiver() {
+
+ // No valid next connection will exist, return null
+ if (listeners == 0)
+ return null;
+
+ Iterator i = subscribers.values().iterator();
+ JMSServerQueueReceiver firstFoundConnection = null;
+ boolean enableSelectNext = false;
+
+ while (i.hasNext()) {
+ JMSServerQueueReceiver t = (JMSServerQueueReceiver) i.next();
+
+ // Select the next valid connection if we are past the last
used connection
+ if (t == lastUsedQueueReceiver || lastUsedQueueReceiver ==
null)
+ enableSelectNext = true;
+
+ // Test to see if the connection is valid pick
+ if (t.isListening()) {
+ // Store the first valid connection since the last
used might be the last
+ // in the list
+ if (firstFoundConnection == null)
+ firstFoundConnection = t;
+
+ // Are we past the last used? then we have the next
item in the round robin
+ if (enableSelectNext && t != lastUsedQueueReceiver) {
+ lastUsedQueueReceiver = t;
+ return t;
+ }
+ }
+ }
+
+ // We got here because we did not find a valid item in the list after
the last
+ // used item, so lest use the first valid item
+ if (firstFoundConnection != null) {
+ lastUsedQueueReceiver = firstFoundConnection;
+ return firstFoundConnection;
+ } else {
+ Log.error("FIXME: The listeners count was invalid !");
+ return null;
+ }
+ }
+
+
+
+
+ //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
+ SpyMessage queueReceive(long wait, SpyDistributedConnection dc) throws
JMSException
+ {
+
+ // Synchronized code : We want to avoid sending messages while we are
changing the connection status
+ synchronized (this) {
+ JMSServerQueueReceiver
qr=(JMSServerQueueReceiver)subscribers.get(dc.getClientID());
+ if (qr==null) throw new JMSException("This
DistributedConnection is not registered");
+
+ if( wait < 0 ) {
+ synchronized (messages) {
+ if (messages.size()==0) return null;
+ SpyMessage m = (SpyMessage)messages.first();
+ messages.remove(m);
+ return m;
+ }
+ } else {
+
+ qr.addReceiver(wait);
+
+ if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue)
{
+ synchronized (messages) {
+ if (!messages.isEmpty())
notifyWorkers();
+ }
+ }
+
+ return null;
+ }
+ }
+ }
+
+
+
+ //Used to put a message that was added previously to the queue, back in the
queue
+ public void restoreMessage(SpyMessage mes)
+ {
+ //restore a message to the message list...
+ synchronized (messages) {
+ messages.add(mes);
+
+ if (isTopic) {
+ //if a thread is already working on this destination,
I don't have to myself to the taskqueue
+ if (!threadWorking) notifyWorkers();
+ } else {
+ if (listeners!=0&&!threadWorking) notifyWorkers();
+ }
+
+ }
+ }
+
+
+
}
1.15 +256 -92 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- SpyConnection.java 2000/11/16 22:39:46 1.14
+++ SpyConnection.java 2000/11/19 19:59:56 1.15
@@ -31,7 +31,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -140,14 +140,22 @@
if (!modeStop) return;
modeStop=false;
- Iterator i=destinations.values().iterator();
+ Iterator i=destinations.keySet().iterator();
while (i.hasNext()) {
- ConnectionQueue cq=(ConnectionQueue)i.next();
- cq.start();
+ Destination d=(Destination)i.next();
+ ConsumerSet ci=(ConsumerSet)destinations.get(d);
+
+ if ( ci.getLasListeningState() ) {
+ try {
+
provider.connectionListening(true,d,distributedConnection);
+ } catch ( Exception e ) {
+ failureHandler(e, "Cannot contact the JMS
server");
+ }
+ }
+
}
- changeModeStop(modeStop);
-
+ changeModeStop(modeStop);
}
public void stop() throws JMSException
@@ -157,11 +165,20 @@
if (modeStop) return;
modeStop=true;
-
- Iterator i=destinations.values().iterator();
+
+ Iterator i=destinations.keySet().iterator();
while (i.hasNext()) {
- ConnectionQueue cq=(ConnectionQueue)i.next();
- cq.stop();
+ Destination d=(Destination)i.next();
+ ConsumerSet ci=(ConsumerSet)destinations.get(d);
+
+ if ( ci.getLasListeningState() ) {
+ try {
+
provider.connectionListening(false,d,distributedConnection);
+ } catch ( Exception e ) {
+ failureHandler(e, "Cannot contact the JMS
server");
+ }
+ }
+
}
changeModeStop(modeStop);
@@ -278,75 +295,9 @@
}
}
- //A Session has created a new MessageConsumer for the Destination dest
- void addSession(Destination dest, SpySession who) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
- if (distributedConnection==null) createReceiver();
-
- Log.log("Connection: addSession(dest="+dest.toString()+")");
-
-
- try {
-
- synchronized (destinations) {
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)destinations.get(dest);
-
- if (connectionQueue==null) {
- connectionQueue=new ConnectionQueue(dest,this);
- connectionQueue.addSession(who);
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.put(dest,connectionQueue);
- destinations=newDestinations;
- provider.subscribe(dest,distributedConnection);
- } else {
- connectionQueue.addSession(who);
- }
- }
-
- } catch (Exception e) {
- failureHandler(e,"Cannot subscribe to this Destination");
- }
- }
- //The session does not need to recieve the messages to Destination dest
- void removeSession(Destination dest, SpySession who) throws JMSException
- {
- if (distributedConnection==null) createReceiver();
-
- Log.log("Connection: removeSession(dest="+dest.toString()+")");
-
- try {
-
- synchronized (destinations) {
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)destinations.get(dest);
-
- if (connectionQueue!=null) {
- boolean
empty=connectionQueue.removeSession(who);
- if (empty) {
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.remove(dest);
- destinations=newDestinations;
-
provider.unsubscribe(dest,distributedConnection);
- }
- } else {
- //this should not happen
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.remove(dest);
- destinations=newDestinations;
-
provider.unsubscribe(dest,distributedConnection);
- }
-
- }
-
- } catch (Exception e) {
- failureHandler(e,"Cannot unsubscribe to this destination");
- }
- }
//Get a new messageID (creation of a new message)
String getNewMessageID() throws JMSException
@@ -383,15 +334,7 @@
//We could check this, though
}
- SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
- {
- try {
- return provider.queueReceiveNoWait(queue);
- } catch (Exception e) {
- failureHandler(e,"Cannot create a ConnectionReceiver");
- return null;
- }
- }
+
// Protected -------------------------------------------------------
@@ -401,6 +344,7 @@
try {
if (clientID==null) askForAnID();
distributedConnection=ConnectionReceiverFactory.createDistributedConnection(clientID,this,crClassName);
+ provider.setSpyDistributedConnection(distributedConnection);
} catch (Exception e) {
failureHandler(e,"Cannot create a ConnectionReceiver");
}
@@ -416,9 +360,10 @@
}
}
+
public void failureHandler(Exception e,String reason) throws JMSException
{
- Log.error(e);
+ e.printStackTrace();
JMSException excep=new JMSException(reason);
excep.setLinkedException(e);
@@ -431,11 +376,230 @@
throw excep;
}
+
+
+ protected void acknowledge(Destination dest, String messageId, boolean isAck)
throws JMSException
+ {
+ try {
+ SpyAcknowledgementItem item[] = { new SpyAcknowledgementItem()
};
+ item[0].jmsDestination = dest;
+ item[0].jmsMessageID = messageId;
+ provider.acknowledge(item, isAck,distributedConnection);
+ } catch (Exception e) {
+ failureHandler(e,"Cannot acknowlege a message.");
+ }
+ }
+
+
-/**
- * Creation date: (11/16/2000 2:20:22 PM)
- * @return org.spydermq.distributed.interfaces.DistributedJMSServer
- */
-public org.spydermq.distributed.interfaces.DistributedJMSServer getProvider() {
- return provider;
-}}
+ public DistributedJMSServer getProvider() {
+ return provider;
+ }
+
+
+ // Used to ack/nak a set of messages.
+ protected void acknowledge(SpyAcknowledgementItem[] items, boolean isAck)
throws JMSException {
+
+ try {
+ provider.acknowledge(items, isAck,distributedConnection);
+ } catch (Exception e) {
+ failureHandler(e,"Cannot acknowlege a message.");
+ }
+
+ }
+
+ // The ConsumerSet inner class is used by:
+ //
+ // addConsumer()
+ // removeConsumer()
+ // getConsumers()
+ // listenerChange()
+ // pickListeningConsumer()
+ //
+ class ConsumerSet extends HashSet {
+ boolean lasListeningState=false;
+
+ boolean getLasListeningState() {
+ return lasListeningState;
+ }
+
+ boolean listenStateChanged() {
+ boolean t = false;
+
+ Iterator iter = iterator();
+ while( iter.hasNext() ) {
+ SpyMessageConsumer c = (SpyMessageConsumer)iter.next();
+ if( c.isListening() ) {
+ t = true;
+ break;
+ }
+ }
+
+ if( t == lasListeningState ) {
+ return false;
+ }
+ lasListeningState = t;
+ return true;
+ }
+ }
+
+ //A new Consumer has been created for the Destination dest
+ void addConsumer(Destination dest, SpyMessageConsumer consumer) throws
JMSException
+ {
+ if (closed) throw new IllegalStateException("The connection is
closed");
+ if (distributedConnection==null) createReceiver();
+
+ Log.log("Connection: addConsumer(dest="+dest.toString()+")");
+
+ try {
+
+ synchronized (destinations) {
+
+ ConsumerSet
consumerSet=(ConsumerSet)destinations.get(dest);
+
+ if (consumerSet==null) {
+ consumerSet=new ConsumerSet();
+ consumerSet.add(consumer);
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.put(dest,consumerSet);
+ destinations=newDestinations;
+ provider.subscribe(dest,distributedConnection);
+ } else {
+ consumerSet.add(consumer);
+ }
+ }
+
+ } catch (Exception e) {
+ failureHandler(e,"Cannot subscribe to this Destination");
+ }
+
+ }
+
+
+ //A consumer does not need to recieve the messages from a Destination
+ void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException {
+
+ if (distributedConnection==null) createReceiver();
+
+ Log.log("Connection: removeSession(dest="+dest.toString()+")");
+
+ try {
+
+ synchronized (destinations) {
+
+ ConsumerSet
consumerSet=(ConsumerSet)destinations.get(dest);
+
+ if (consumerSet!=null) {
+ boolean empty=consumerSet.remove(who);
+ if (empty) {
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.remove(dest);
+ destinations=newDestinations;
+
provider.unsubscribe(dest,distributedConnection);
+ }
+ } else {
+ //this should not happen
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.remove(dest);
+ destinations=newDestinations;
+
provider.unsubscribe(dest,distributedConnection);
+ }
+
+ }
+
+ } catch (Exception e) {
+ failureHandler(e,"Cannot unsubscribe to this destination");
+ }
+
+ }
+
+
+ //Gets all the consumers subscribed to a destination
+ public SpyMessageConsumer[] getConsumers(Destination dest) throws JMSException
{
+
+ if (closed) throw new IllegalStateException("The connection is
closed");
+ if (distributedConnection==null) createReceiver();
+
+ synchronized (destinations) {
+ ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);
+ if (consumerSet==null || consumerSet.size()==0)
+ return null;
+
+ SpyMessageConsumer rc[]=new
SpyMessageConsumer[consumerSet.size()];
+ return (SpyMessageConsumer[])consumerSet.toArray(rc);
+ }
+
+ }
+
+ //Gets the first consumer that is listening to a destination.
+ public SpyMessageConsumer pickListeningConsumer(Destination dest) throws
JMSException {
+
+ if (closed) throw new IllegalStateException("The connection is
closed");
+ if (distributedConnection==null) createReceiver();
+
+ synchronized (destinations) {
+
+ ConsumerSet consumerSet=(ConsumerSet)destinations.get(dest);
+
+ if (consumerSet==null || consumerSet.size()==0) {
+ return null;
+ } else {
+ Iterator i = consumerSet.iterator();
+ while( i.hasNext() ) {
+ SpyMessageConsumer c =
(SpyMessageConsumer)i.next();
+ if( c.isListening() || c.waitInReceive ) {
+ return c;
+ }
+ }
+ }
+ }
+
+ return null;
+
+ }
+
+
+
+
+ /**
+ * Called whenever a consumer changes his listening state on a destination.
+ * We see if the consumer change, changed the overall listening state for the
destination.
+ * Creation date: (11/16/2000 2:20:22 PM)
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ public void listenerChange(Destination d) throws JMSException {
+
+ if (closed) throw new IllegalStateException("The connection is
closed");
+ if (distributedConnection==null) createReceiver();
+
+ ConsumerSet ci=(ConsumerSet)destinations.get(d);
+ if( ci.listenStateChanged() ) {
+ try {
+ if ( ci.getLasListeningState() ) {
+
provider.connectionListening(true,d,distributedConnection);
+ } else {
+
provider.connectionListening(false,d,distributedConnection);
+ }
+ } catch ( Exception e ) {
+ failureHandler(e, "Cannot contact the JMS server");
+ }
+ }
+
+ }
+
+
+ /**
+ * Creation date: (11/16/2000 2:20:22 PM)
+ * @return org.spydermq.distributed.interfaces.DistributedJMSServer
+ */
+ SpyMessage queueReceive(Queue queue, long wait) throws JMSException {
+
+ try {
+ return provider.queueReceive(queue,
wait,distributedConnection);
+ } catch (Exception e) {
+ failureHandler(e,"Cannot create a ConnectionReceiver");
+ return null;
+ }
+ }
+
+}
1.7 +7 -6 spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
Index: SpyDistributedConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyDistributedConnection.java 2000/11/04 13:30:23 1.6
+++ SpyDistributedConnection.java 2000/11/19 19:59:57 1.7
@@ -6,16 +6,18 @@
*/
package org.spydermq;
+import java.util.HashMap;
+import java.io.Serializable;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
-import java.io.Serializable;
+
/**
* This class is the broker point of view on a SpyConnection (it contains a
ConnectionReceiver)
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class SpyDistributedConnection
implements Serializable
@@ -24,7 +26,6 @@
private String clientID;
private int hash;
public transient ConnectionReceiverSetup cr_server;
- public transient boolean listeners;
public ConnectionReceiver cr;
public SpyDistributedConnection(String clientID,ConnectionReceiverSetup
cr_server)
@@ -47,9 +48,9 @@
public boolean equals(Object obj)
{
- // Fixes NPE. Patch submitted by John Ellis (10/29/00)
- if (obj==null) return false;
-
+ // Fixes NPE. Patch submitted by John Ellis (10/29/00)
+ if (obj==null) return false;
+
if (obj.getClass()!=SpyDistributedConnection.class) return false;
if (obj.hashCode()!=hash) return false;
if (clientID==null) return true;
1.7 +49 -21 spyderMQ/src/java/org/spydermq/SpyMessage.java
Index: SpyMessage.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessage.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyMessage.java 2000/11/17 17:30:27 1.6
+++ SpyMessage.java 2000/11/19 19:59:57 1.7
@@ -15,8 +15,15 @@
import java.util.Enumeration;
import java.util.Hashtable;
import java.io.Serializable;
-
import java.lang.Comparable;
+
+/**
+ * This class implements javax.jms.Message
+ *
+ * @author Norbert Lataille ([EMAIL PROTECTED])
+ *
+ * @version $Revision: 1.7 $
+ */
public class SpyMessage
implements Serializable, Cloneable, Message, Comparable
{
@@ -58,20 +65,21 @@
//Message body
protected boolean msgReadOnly=false;
- //Those attributes are transient ---------------
-
- //For acknowledgment
- private transient SessionQueue mySessionQueue;
+ //For acknowledgment (set on the client side)
+ private transient SpySession spySession;
+
//For the storage in the JMSServerQueue object
public transient SpyDistributedConnection originalDistributedConnection;
-
+ //For ordering in the JMSServerQueue
+ public transient long messageId;
+
// Constructor ---------------------------------------------------
SpyMessage()
{
prop=new Hashtable();
propReadWrite=true;
- mySessionQueue=null;
+ spySession=null;
}
// Public --------------------------------------------------------
@@ -401,18 +409,15 @@
public void acknowledge() throws JMSException
{
- //There is no need to acknowledge() this message
- if (mySessionQueue==null) return;
+ if (spySession==null)
+ throw new JMSException("This message was not recieved from the
provider");
+
+ if( spySession.acknowledgeMode == spySession.CLIENT_ACKNOWLEDGE )
+ doAcknowledge();
- mySessionQueue.acknowledge(this);
}
- // Package protected ---------------------------------------------
-
- void setSessionQueue(SessionQueue sessionQueue)
- {
- mySessionQueue=sessionQueue;
- }
+
void setReadOnlyMode()
{
@@ -435,9 +440,9 @@
long ts=System.currentTimeMillis();
return jmsExpiration<ts;
}
-
- //For ordering in the JMSServerQueue
- public transient long messageId; /**
+
+
+ /**
* Return a negative number if this message should be sent
* before the o message. Return a positive if should be sent
* after the o message.
@@ -452,6 +457,29 @@
if( jmsPriority < sm.jmsPriority ) {
return 1;
}
- return (int)(messageId - sm.messageId);
+ return (int)(messageId - sm.messageId);
+ }
+
+
+ public void doAcknowledge() throws JMSException
+ {
+ spySession.getConnection().acknowledge(jmsDestination, jmsMessageID,
true);
+ }
+
+
+ public void doNegAcknowledge() throws JMSException
+ {
+ spySession.getConnection().acknowledge(jmsDestination, jmsMessageID,
false);
+ }
+
- } }
+ public SpySession getSpySession() {
+ return spySession;
+ }
+
+
+ public void setSpySession(SpySession newSpySession) {
+ spySession = newSpySession;
+ }
+
+}
1.4 +64 -61 spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyMessageConsumer.java 2000/06/09 20:03:58 1.3
+++ SpyMessageConsumer.java 2000/11/19 19:59:57 1.4
@@ -11,7 +11,9 @@
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.Session;
+import javax.jms.Destination;
import java.util.LinkedList;
+import java.util.Iterator;
import org.spydermq.selectors.Selector;
/**
@@ -19,7 +21,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -27,7 +29,7 @@
// Attributes ----------------------------------------------------
//Link to my session
- protected SpySession session;
+ public SpySession session;
//My message listener (null if none)
MessageListener messageListener;
//Am I closed ?
@@ -36,12 +38,14 @@
public Selector selector;
//The message selector
public String messageSelector;
- //A link to my session queue (in my session)
- protected SessionQueue sessionQueue;
+
//List of Pending messages (not yet delivered)
LinkedList messages;
//Is the consumer sleeping in a receive() ?
boolean waitInReceive;
+ public Destination destination;
+ //If the session is transacted: contains JMSMessageId's of messages consumed
+ LinkedList messagesConsumed;
// Constructor ---------------------------------------------------
@@ -54,30 +58,30 @@
messageSelector=null;
messages=new LinkedList();
waitInReceive=false;
+ if( session.transacted ) {
+ messagesConsumed = new LinkedList();
+ }
}
- void setSessionQueue(SessionQueue sessionQueue)
- {
- this.sessionQueue=sessionQueue;
- }
+
// Public --------------------------------------------------------
- public String getMessageSelector() throws JMSException
+ public String getMessageSelector() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return messageSelector;
}
- public MessageListener getMessageListener() throws JMSException
+ public MessageListener getMessageListener() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return messageListener;
}
- public void setMessageListener(MessageListener listener) throws JMSException
+ public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (waitInReceive) throw new JMSException("This MessageConsumer is
waiting in receive() !");
@@ -85,7 +89,7 @@
//The QueueReceiver object need to notify their session / connection /
the broker
}
- public Message receive() throws JMSException
+ public Message receive() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (messageListener!=null) throw new JMSException("A message listener
is already registered");
@@ -94,7 +98,7 @@
return null;
}
- public Message receive(long timeOut) throws JMSException
+ public Message receive(long timeOut) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (messageListener!=null) throw new JMSException("A message listener
is already registered");
@@ -103,7 +107,7 @@
return null;
}
- public Message receiveNoWait() throws JMSException
+ public Message receiveNoWait() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
if (messageListener!=null) throw new JMSException("A message listener
is already registered");
@@ -112,7 +116,7 @@
return null;
}
- public synchronized void close() throws JMSException
+ public synchronized void close() throws JMSException
{
//Job is done in the inherited classes
//The QueueReceiver object need to notify their session / connection /
the broker
@@ -155,38 +159,18 @@
//the SAME Message object is put in different
SessionQueues
//when we deliver it, we have to clone() it to
insure independance
SpyMessage message=mes.myClone();
-
- if (!session.transacted) {
- if
(session.acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
-
- synchronized
(sessionQueue.messagesWaitingForAck) {
- //Put the message in
the messagesWaitForAck queue
-
sessionQueue.messagesWaitingForAck.addLast(message);
- }
-
-
message.setSessionQueue(sessionQueue);
-
- } else if
(session.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
- //DUPS_OK_ACKNOWLEDGE
- } else {
- //AUTO_ACKNOWLEDGE
- //we don't need to keep this
message in a queue
- }
- } else {
-
- //We are linked to a transacted
session
-
- synchronized
(sessionQueue.messagesWaitingForAck) {
- //Put the message in the
messagesWaitForAck queue
-
sessionQueue.messagesWaitingForAck.addLast(message);
- }
-
+ message.setSpySession(session);
+
+ if( session.transacted ) {
+
messagesConsumed.addLast(message.getJMSMessageID());
+ } else if(
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE ||
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
+ message.doAcknowledge();
}
-
+
return message;
} catch (Exception e) {
- Log.error(e);
+ e.printStackTrace();
}
}
@@ -195,28 +179,47 @@
}
- void addMessage(SpyMessage mes) throws JMSException
+ public void addMessage(SpyMessage mes) throws JMSException
{
+ //Set the session in the message so it can acknowlege
+ mes.setSpySession(session);
+
synchronized (messages) {
//Add a message to the queue
-
- //Test the priority
- int pri=mes.getJMSPriority();
-
- if (pri<=4) {
- //normal priority message
- messages.addLast(mes);
- } else {
- //expedited priority message
- int size=messages.size();
- int i=0;
- for(;i<size;i++) {
- if
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
- }
- messages.add(i,mes);
- }
-
+ messages.addLast(mes);
}
}
+
+ public boolean deliverMessage() throws JMSException {
+
+ synchronized (messages) {
+ if (messages.size()==0)
+ return false;
+
+ if (messageListener==null) {
+ if (!waitInReceive)
+ return false;
+ messages.notify();
+ } else {
+ SpyMessage mes=getMessage();
+ if (mes==null)
+ return false;
+
+ messageListener.onMessage(mes);
+ if( session.transacted ) {
+
messagesConsumed.addLast(mes.getJMSMessageID());
+ } else if(
session.acknowledgeMode==session.AUTO_ACKNOWLEDGE ||
session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
+ mes.doAcknowledge();
+ }
+
+ }
+ }
+ return true;
+ }
+
+
+ public boolean isListening() {
+ return false;
+ }
}
1.5 +119 -77 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyQueueReceiver.java 2000/06/09 20:03:58 1.4
+++ SpyQueueReceiver.java 2000/11/19 19:59:57 1.5
@@ -17,7 +17,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -32,9 +32,10 @@
// Constructor ---------------------------------------------------
- SpyQueueReceiver(SpyQueueSession session,Queue queue)
+ SpyQueueReceiver(SpyQueueSession session,Queue queue)
{
super(session);
+ this.destination=queue;
this.queue=queue;
listening=false;
}
@@ -59,122 +60,163 @@
}
//Overrides MessageConsumer
-
- public Message receive() throws JMSException
- {
+ public Message receive() throws JMSException {
super.receive();
-
- setListening(true);
-
+
+
+ //if the client follows the specification [4.4.6], he cannot use this
session
+ //to asynchronously receive a message or receive() in another thread.
+ //If a message is already pending for this session, we can immediatly
deliver it
synchronized (messages) {
-
- //if the client follows the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() in another
thread.
- //If a message is already pending for this session, we can
immediatly deliver it
-
- while (true) {
-
- if (closed) {
- setListening(false);
- return null;
- }
-
- if (!session.modeStop) {
- Message mes=getMessage();
- if (mes!=null) {
- setListening(false);
- return mes;
- }
- } else Log.log("the connection is stopped !");
- try {
- waitInReceive=true;
+ waitInReceive = true;
+ session.connection.queueReceive(queue, 0);
+
+ try {
+
+ while (true) {
+
+ if (!session.modeStop) {
+
+ Message mes = getMessage();
+ if (mes != null)
+ return mes;
+
+ } else
+ Log.log("the connection is stopped !");
+
messages.wait();
- } catch (InterruptedException e) {
- } finally {
- waitInReceive=false;
}
-
- }
+ } catch (InterruptedException e) {
+ JMSException newE = new JMSException("Receive
interupted");
+ newE.setLinkedException(e);
+ throw newE;
+ } finally {
+ waitInReceive=false;
+ }
}
-
}
- public Message receive(long timeOut) throws JMSException
+ public Message receive(long timeOut) throws JMSException
{
super.receive(timeOut);
-
+
if (timeOut==0) return receive();
long endTime=System.currentTimeMillis()+timeOut;
-
- setListening(true);
+
+ //if the client respects the specification [4.4.6], he cannot use this
session
+ //to asynchronously receive a message or receive() from another thread.
+ //If a message is already pending for this session, we can deliver it
synchronized (messages) {
- //if the client respects the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() from
another thread.
- //If a message is already pending for this session, we can
deliver it
-
- while (true) {
-
- if (closed) {
- setListening(false);
- return null;
- }
+ waitInReceive=true;
+ session.connection.queueReceive(queue,timeOut);
+
+ try {
- if (!session.modeStop) {
- Message mes=getMessage();
- if (mes!=null) {
- setListening(false);
- return mes;
+ while (true) {
+
+ if (!session.modeStop) {
+ Message mes=getMessage();
+ if (mes!=null) {
+ return mes;
+ }
+
+ } else
+ Log.log("the connection is stopped !");
+
+ long att=endTime-System.currentTimeMillis();
+ if (att<=0) {
+ return null;
}
- } else Log.log("the connection is stopped !");
-
- long att=endTime-System.currentTimeMillis();
- if (att<=0) {
- setListening(false);
- return null;
- }
- try {
- waitInReceive=true;
messages.wait(att);
- } catch (InterruptedException e) {
- } finally {
- waitInReceive=false;
}
- }
+ } catch (InterruptedException e) {
+ JMSException newE = new JMSException("Receive
interupted");
+ newE.setLinkedException(e);
+ throw newE;
+ } finally {
+ waitInReceive=false;
+ }
}
}
- public Message receiveNoWait() throws JMSException
+ public Message receiveNoWait() throws JMSException
{
- super.receiveNoWait();
-
+ super.receiveNoWait();
if (session.modeStop) return null;
-
- return session.connection.queueReceiveNoWait(queue);
+ return session.connection.queueReceive(queue,-1);
}
- public void setMessageListener(MessageListener listener) throws JMSException
+ public void setMessageListener(MessageListener listener) throws JMSException
{
super.setMessageListener(listener);
messageListener=listener;
setListening(listener!=null);
}
-
- //---
+ //---
void setListening(boolean newvalue) throws JMSException
{
if (newvalue==listening) return;
listening=newvalue;
- if (listening) sessionQueue.changeNumListening(1);
- else sessionQueue.changeNumListening(-1);
+
+ session.getConnection().listenerChange(queue);
}
+ //Called by the ConnectionReceiver which has just received a message - in the
Queue case only
+ public void dispatchMessage(SpyMessage mes) throws JMSException {
+
+ if (session.closed)
+ throw new NoReceiverException("The session is closed");
+ if (session.modeStop)
+ throw new NoReceiverException("The session is stopped");
+ if (mes.isOutdated())
+ return;
+
+ //Work with this receiver
+ if (messageListener == null) {
+ synchronized (messages) {
+
+ if ( waitInReceive ) {
+ if( messages.size()==0 ) {
+ addMessage(mes);
+ messages.notify();
+ } else {
+ Log.notice("Got too many messages for
one receive.!");
+ throw new NoReceiverException("Got too
many messages for one receive.!");
+ }
+ } else {
+ Log.notice("Message did not arrive in time for
the receive!");
+ throw new NoReceiverException("Message did not
arrive in time for the receive!");
+ }
+
+ }
+ } else {
+
+ if (!isListening())
+ throw new NoReceiverException("The receiver is not
longer listening!");
+
+ //Set the session in the message so it can acknowlege
+ mes.setSpySession(session);
+ messageListener.onMessage(mes);
+
+ if( session.transacted ) {
+ messagesConsumed.addLast(mes.getJMSMessageID());
+ } else if( session.acknowledgeMode==session.AUTO_ACKNOWLEDGE
|| session.acknowledgeMode==session.DUPS_OK_ACKNOWLEDGE ) {
+ mes.doAcknowledge();
+ }
+ }
+
+ }
+
+ public boolean isListening() {
+ return listening;
+ }
}
1.5 +2 -8 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyQueueSession.java 2000/11/14 06:15:53 1.4
+++ SpyQueueSession.java 2000/11/19 19:59:57 1.5
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyQueueSession
extends SpySession
@@ -68,7 +68,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
- SessionQueue sessionQueue=addConsumer(queue,receiver);
+ addConsumer(queue,receiver);
return receiver;
}
@@ -95,13 +95,7 @@
return ((SpyQueueConnection)connection).getTemporaryQueue();
}
- //Not part of the spec
-
- //Called by the ConnectionReceiver object : put a new msg in the receiver's
queue
- public void dispatchMessage(Destination dest,SpyMessage mes) throws
JMSException
- {
- //Done in the SessionQueue :)
- }
+
// Package protected ---------------------------------------------
1.12 +97 -100 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- SpySession.java 2000/11/14 06:15:53 1.11
+++ SpySession.java 2000/11/19 19:59:57 1.12
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class SpySession
implements Runnable, Session
@@ -45,8 +45,7 @@
private MessageListener messageListener;
//The connection object to which this session is linked
protected SpyConnection connection;
- //HashMap of SessionQueue by Destination
- public HashMap destinations;
+
//The outgoing message queue
protected LinkedList outgoingQueue;
//The outgoing message queue for messages that have been commited (if the
session is transacted)
@@ -61,15 +60,17 @@
public boolean alphaMode;
// Should we do client side persistence?
public boolean clientPersistence = true;
+ //MessageConsumers created by this session
+ protected HashSet consumers;
// Constructor ---------------------------------------------------
SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
{
+
connection=conn;
transacted=trans;
acknowledgeMode=acknowledge;
- destinations=new HashMap();
outgoingQueue=new LinkedList();
outgoingCommitedQueue=new LinkedList();
modeStop=stop;
@@ -77,15 +78,17 @@
closed=false;
mutex=new Mutex();
alphaMode=true;
+ consumers = new HashSet();
//Start my thread
- Thread oneThread=new Thread(this);
+ Thread oneThread=new Thread(this, "SpySession");
oneThread.setDaemon(true);
oneThread.start();
//Wait for the thread to sleep
mutex.waitLocked();
+
}
// Public --------------------------------------------------------
@@ -231,18 +234,19 @@
Log.error(e);
}
}
-
- //if we are not in stopped mode, look at the incoming queue
-
- if (!modeStop) {
-
- Collection values = destinations.values();
- Iterator i=values.iterator();
- while (i.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i.next();
- doneJob=doneJob||sessionQueue.deliverMessage();
+
+ try {
+ //if we are not in stopped mode, look at the incoming
queue
+ if (!modeStop) {
+ Iterator i=consumers.iterator();
+ while (i.hasNext()) {
+ SpyMessageConsumer
mc=(SpyMessageConsumer)i.next();
+ doneJob=doneJob||mc.deliverMessage();
+ }
}
-
+ } catch (JMSException e) {
+ Log.log("Cannot receive a message from the
provider...");
+ Log.error(e);
}
//If there was smthg to do, try again
@@ -273,22 +277,16 @@
//notify the sleeping synchronous listeners
- Collection values = destinations.values();
- Iterator i=values.iterator();
+ Iterator i=consumers.iterator();
while (i.hasNext()) {
- SessionQueue sessionQueue=(SessionQueue)i.next();
- sessionQueue.close();
+ SpyMessageConsumer
messageConsumer=(SpyMessageConsumer)i.next();
+ messageConsumer.close();
}
connection.sessionClosing(this);
}
- //Called by the ConnectionReceiver which has just received a message
- public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
- {
- //The job is done in inherited classes - in the SPyTopicSession only
- throw new RuntimeException("pure virtual call");
- }
+
//Commit a transacted session
public synchronized void commit() throws JMSException
@@ -328,16 +326,11 @@
//Move the outgoing messages from the outgoingQueue to the
outgoingCommitedQueue
outgoingCommitedQueue.addAll(outgoingQueue);
outgoingQueue.clear();
+
+ //Acknowlege all consumed messages
+ SpyAcknowledgementItem items[] = removeAcknowledgementItems();
+ connection.acknowledge(items, true);
- //Notify each SessionQueue that we are going to commit
- Collection values = destinations.values();
- Iterator i=values.iterator();
- while (i.hasNext()) {
- SessionQueue sessionQueue=(SessionQueue)i.next();
- sessionQueue.commit();
- }
-
-
//We have finished our work, we can wake up the thread
modeStop=modeSav;
mutex.notifyLock();
@@ -365,13 +358,9 @@
//Clear the outgoing queue
outgoingQueue.clear();
- //Notify each SessionQueue that we are going to rollback
- Collection values = destinations.values();
- Iterator i=values.iterator();
- while (i.hasNext()) {
- SessionQueue sessionQueue=(SessionQueue)i.next();
- sessionQueue.recover();
- }
+ //Neg Acknowlege all consumed messages
+ SpyAcknowledgementItem items[] = removeAcknowledgementItems();
+ connection.acknowledge(items, false);
//We have finished our work, we can wake up the thread
modeStop=modeSav;
@@ -393,14 +382,10 @@
synchronized (mutex) {
mutex.waitToSleep();
-
- //Notify each SessionQueue that we are going to recover
- Collection values = destinations.values();
- Iterator i=values.iterator();
- while (i.hasNext()) {
- SessionQueue sessionQueue=(SessionQueue)i.next();
- sessionQueue.recover();
- }
+
+ //Neg Acknowlege all consumed messages
+ SpyAcknowledgementItem items[] = removeAcknowledgementItems();
+ connection.acknowledge(items, false);
//We have finished our work, we can wake up the thread
modeStop=modeSav;
@@ -414,64 +399,16 @@
{
Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
- //Remove it from the subscribers list
- synchronized (destinations) {
- HashMap newMap=(HashMap)destinations.clone();
+ synchronized (consumers) {
+ HashSet newMap=(HashSet)consumers.clone();
newMap.remove(dest);
- destinations=newMap;
+ consumers=newMap;
}
//We could look at our incoming and outgoing queues to drop messages
}
- // Package protected ---------------------------------------------
-
- SessionQueue addConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
-
- Log.log("Session:
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
-
- synchronized (destinations) {
- SessionQueue sub=(SessionQueue)destinations.get(dest);
- if (sub==null) {
- sub=new SessionQueue(this,dest);
- sub.addConsumer(who);
- HashMap newDestinations=(HashMap)destinations.clone();
- newDestinations.put(dest,sub);
- destinations=newDestinations;
- connection.addSession(dest,this);
- } else {
- sub.addConsumer(who);
- }
- return sub;
- }
- }
-
- void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
- {
- Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
-
- synchronized (destinations) {
- SessionQueue sub=(SessionQueue)destinations.get(dest);
- if (sub!=null) {
- boolean empty=sub.removeConsumer(who);
- if (empty) {
- HashMap
newDestinations=(HashMap)destinations.clone();
- newDestinations.remove(dest);
- destinations=newDestinations;
- connection.removeSession(dest,this);
- }
- } else {
- //this should not happen
- HashMap newDestinations=(HashMap)destinations.clone();
- newDestinations.remove(dest);
- destinations=newDestinations;
- }
- }
- }
-
String getNewMessageID() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -503,4 +440,64 @@
}
+ void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
+ {
+ Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
+
+ consumers.remove( who );
+
+ connection.removeConsumer(dest, who );
+ }
+
+ void addConsumer(Destination dest, SpyMessageConsumer who) throws JMSException
+ {
+ if (closed) throw new IllegalStateException("The session is closed");
+
+ Log.log("Session:
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
+ connection.addConsumer(dest, who);
+
+ consumers.add( who );
+
+ }
+
+
+ /**
+ * @return org.spydermq.SpyConnection
+ */
+ public SpyConnection getConnection() {
+ return connection;
+ }
+
+
+ //Get all the messages that have been consumed but need acks
+ private SpyAcknowledgementItem[] removeAcknowledgementItems()
+ {
+ //Count the messages
+ int i=0;
+ Iterator iter=consumers.iterator();
+ while (iter.hasNext()) {
+ SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
+ i += mc.messagesConsumed.size();
+ }
+
+ //Now fill the array
+ SpyAcknowledgementItem items[] = new SpyAcknowledgementItem[i];
+ i=0;
+ iter=consumers.iterator();
+ while (iter.hasNext()) {
+ SpyMessageConsumer mc=(SpyMessageConsumer)iter.next();
+ Iterator mesIter = mc.messagesConsumed.iterator();
+ while(mesIter.hasNext()) {
+ String messageId = (String)mesIter.next();
+ SpyAcknowledgementItem item = new
SpyAcknowledgementItem();
+ item.jmsDestination = mc.destination;
+ item.jmsMessageID = messageId;
+ items[i++] = item;
+ }
+ mc.messagesConsumed.clear();
+ }
+
+ return items;
+ }
+
}
1.7 +1 -24 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyTopicSession.java 2000/11/14 06:15:54 1.6
+++ SpyTopicSession.java 2000/11/19 19:59:57 1.7
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class SpyTopicSession
extends SpySession
@@ -63,7 +63,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
- SessionQueue sessionQueue=addConsumer(topic,sub);
+ addConsumer(topic,sub);
if (messageSelector!=null) {
Selector selector=new Selector(messageSelector);
@@ -108,30 +108,7 @@
}
- // - Package protected ---------------------------------------------
- // - Not part of the spec
-
- //Called by the ConnectionReceiver object : put a new msg in the receiver's
queue
- public void dispatchMessage(Destination dest,SpyMessage mes) throws
JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
-
- Log.log("Session:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
-
- if (mes.isOutdated()) return;
- //Get the SessionQueue for this Destination
- SessionQueue sessionQueue=(SessionQueue)destinations.get(dest);
- if (sessionQueue==null) return;
-
- //Work on the set of SpyTopicSubscriber for this topic
- Iterator i=sessionQueue.subscribers.iterator();
- while (i.hasNext()) {
- SpyTopicSubscriber sub=(SpyTopicSubscriber)i.next();
- sub.addMessage(mes);
- }
-
- }
//called by a MessageProducer object which needs to publish a message
1.6 +8 -7 spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SpyTopicSubscriber.java 2000/06/14 19:20:19 1.5
+++ SpyTopicSubscriber.java 2000/11/19 19:59:57 1.6
@@ -19,7 +19,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -34,9 +34,10 @@
// Constructor ---------------------------------------------------
- SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local)
+ SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local)
{
super(session);
+ destination=topic;
this.topic=topic;
this.local=local;
}
@@ -49,7 +50,7 @@
return topic;
}
- public boolean getNoLocal() throws JMSException
+ public boolean getNoLocal() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return local;
@@ -74,7 +75,7 @@
}
}
- public Message receive() throws JMSException
+ public Message receive() throws JMSException
{
super.receive();
@@ -105,7 +106,7 @@
}
}
- public Message receive(long timeOut) throws JMSException
+ public Message receive(long timeOut) throws JMSException
{
super.receive(timeOut);
@@ -144,7 +145,7 @@
}
- public Message receiveNoWait() throws JMSException
+ public Message receiveNoWait() throws JMSException
{
super.receiveNoWait();
@@ -158,7 +159,7 @@
}
}
- public void setMessageListener(MessageListener listener) throws JMSException
+ public void setMessageListener(MessageListener listener) throws JMSException
{
super.setMessageListener(listener);
1.1 spyderMQ/src/java/org/spydermq/JMSServerQueueReceiver.java
Index: JMSServerQueueReceiver.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.util.HashMap;
import java.util.Iterator;
import java.io.Serializable;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
/**
* This class manages a connection receiver for a JMSServerQueue.
* Keeps track of listening state and unacknowleged messages.
*
*@author Hiram Chirino ([EMAIL PROTECTED])
*
*@version $Revision: 1.1 $
*/
public class JMSServerQueueReceiver implements Serializable {
// The queue messages will be comming from
public JMSServerQueue jmsSeverQueue;
// The connection mewssages will be going to
public SpyDistributedConnection dc;
// Used to know if the connection is accepting messages.
public boolean listeners;
public int receiveReuquests;
// Keeps track of the unacknowledged messages send to the connection.
public transient HashMap unacknowledgedMessages;
// Consturctor ---------------------------------------------------
public JMSServerQueueReceiver(JMSServerQueue serverQueue,
SpyDistributedConnection sdc) {
jmsSeverQueue = serverQueue;
dc = sdc;
listeners = false;
receiveReuquests = 0;
unacknowledgedMessages = new HashMap();
Log.log("A ServerQueueReceiver has been created for : " +
serverQueue.destination + "/" + dc.getClientID());
}
void acknowledge(String messageId, boolean ack) {
SpyMessage m;
synchronized (unacknowledgedMessages) {
m = (SpyMessage) unacknowledgedMessages.remove(messageId);
}
if (m == null)
return;
if (!jmsSeverQueue.isTopic) {
// Not sure how we should handle the topic case.
// On a negative acknowledge, we don't want to
// add it back to the topic since other
// receivers might get a duplicate duplicate message.
} else {
// Was it a negative acknowledge??
if (!ack) {
Log.log("Restoring message: " + m.messageId);
jmsSeverQueue.restoreMessage(m);
} else {
Log.log("Message Ack: " + m.messageId);
}
}
}
// The connection is accepting new messages if there
// is a listener or if the receiver has requested a message.
public boolean isListening() {
return listeners || receiveReuquests > 0;
}
// Called when we send one message.
public void removeReceiver() {
if (receiveReuquests == 0)
return;
receiveReuquests--;
if (receiveReuquests == 0 && !listeners) {
jmsSeverQueue.listeners--;
}
}
// This method gets invoked as a result of a receive() method call
// on a QueueReceiver
public void addReceiver(long wait) {
// TODO: figure out a way to make a reciver eligable for
// wait amount of time.
receiveReuquests++;
if (receiveReuquests == 1 && !listeners) {
jmsSeverQueue.listeners++;
}
}
public void setListening(boolean value) {
if (value == listeners)
return;
listeners = value;
if (value && receiveReuquests == 0)
jmsSeverQueue.listeners++;
if (!value && receiveReuquests == 0)
jmsSeverQueue.listeners--;
}
void sendMultipleMessages(SpyMessage mes[]) throws Exception {
Log.log("DISPATCH: " + mes.length + " messages => " +
dc.getClientID());
if (!jmsSeverQueue.isTopic) {
synchronized (unacknowledgedMessages) {
for (int i = 0; i < mes.length; i++) {
unacknowledgedMessages.put(mes[i].getJMSMessageID(), mes[i]);
}
}
}
dc.cr.receiveMultiple(jmsSeverQueue.destination, mes);
}
void sendOneMessage(SpyMessage mes) throws Exception {
Log.log("DISPATCH: Message: " + mes + " => " + dc.getClientID());
if (!jmsSeverQueue.isTopic) {
synchronized (unacknowledgedMessages) {
unacknowledgedMessages.put(mes.getJMSMessageID(), mes);
}
}
removeReceiver();
dc.cr.receive(jmsSeverQueue.destination, mes);
}
public void close() {
Log.log("A ServerQueueReceiver has been closed: " +
jmsSeverQueue.destination + "/" + dc.getClientID());
if (isListening())
jmsSeverQueue.listeners--;
synchronized (unacknowledgedMessages) {
Iterator iter = unacknowledgedMessages.values().iterator();
Log.log("Restoring " + unacknowledgedMessages.size() + "
messages");
while (iter.hasNext()) {
SpyMessage m = (SpyMessage) iter.next();
jmsSeverQueue.restoreMessage(m);
iter.remove();
}
}
}
}
1.1 spyderMQ/src/java/org/spydermq/SpyAcknowledgementItem.java
Index: SpyAcknowledgementItem.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import java.io.Serializable;
import javax.jms.Destination;
import java.lang.Comparable;
/**
* Used to Acknowledge sent messages.
*
* This class holds the minimum abount of information needed to
* identify a message to the JMSServer.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SpyAcknowledgementItem
implements java.io.Serializable
{
public Destination jmsDestination=null;
public String jmsMessageID=null;
}