User: norbert
Date: 00/05/24 12:17:19
Modified: src/java/org/spyderMQ SessionQueue.java
SpyMessageConsumer.java SpyQueueReceiver.java
SpyQueueSession.java SpySession.java
SpyTopicSession.java SpyTopicSubscriber.java
Log:
Changes to the Pub/Sub System in order to implement P2P.
Now, a SessionQueue is specific to an unique destination.
This is a way to code 'local optimisations' too
Added bug (I think) in acknowledge()
Selectors :
Remove the .getClass().getName() [ thx Rich ]
Revision Changes Path
1.15 +74 -135 spyderMQ/src/java/org/spyderMQ/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- SessionQueue.java 2000/05/18 20:20:57 1.14
+++ SessionQueue.java 2000/05/24 19:17:18 1.15
@@ -11,6 +11,8 @@
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.Iterator;
import org.spydermq.selectors.Selector;
/**
@@ -18,163 +20,65 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public class SessionQueue
{
// Attributes ----------------------------------------------------
- //the MessageConsumer of this queue
- SpyMessageConsumer destination;
- //List of Pending messages (not yet delivered)
- LinkedList messages;
//List of messages waiting for acknoledgment
LinkedList messagesWaitingForAck;
- //Is the consumer sleeping in a receive() ?
- boolean waitInReceive;
//Is the session transacted ?
boolean transacted;
//What is the message acknowledgment mode ?
int acknowledgeMode;
+ //the MessageConsumers linked to this queue
+ HashSet subscribers;
// Constructor ---------------------------------------------------
- SessionQueue(boolean tr,int am)
+ SessionQueue(boolean transacted,int acknowledgeMode)
{
- messages=new LinkedList();
messagesWaitingForAck=new LinkedList();
- waitInReceive=false;
- transacted=tr;
- acknowledgeMode=am;
+ subscribers=new HashSet();
+ this.transacted=transacted;
+ this.acknowledgeMode=acknowledgeMode;
}
// Package protected ---------------------------------------------
- void setConsumer(SpyMessageConsumer consumer)
- {
- destination=consumer;
- }
- void addMessage(SpyMessage mes) throws JMSException
- {
- synchronized (messages) {
- //Add a message to the queue
-
- //Test the priority
- int pri=mes.getJMSPriority();
-
- if (pri<=4) {
- //normal priority message
- messages.addLast(mes);
- } else {
- //expedited priority message
- int size=messages.size();
- int i=0;
- for(;i<size;i++) {
- if
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
- }
- messages.add(i,mes);
- }
-
- }
- }
-
//Send a message from the queue to a MessageConsumer
boolean deliverMessage()
{
- synchronized (messages) {
+ boolean result=false;
+ Iterator i=subscribers.iterator();
+
+ while (i.hasNext()) {
- try {
-
- if (messages.size()==0) return false;
+ SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
+
+ synchronized (consumer.messages) {
- MessageListener
listener=destination.getMessageListener();
-
- if (listener==null) {
- if (!waitInReceive) return false;
- messages.notify();
+ if (consumer.messages.size()==0) continue;
+
+ if (consumer.messageListener==null) {
+ if (!consumer.waitInReceive) continue;
+ consumer.messages.notify();
} else {
- SpyMessage
mes=getMessage(destination.selector);
+ SpyMessage mes=consumer.getMessage();
if (mes==null) return false;
- listener.onMessage(mes);
+ consumer.messageListener.onMessage(mes);
}
- return true;
- } catch (JMSException e) {
- Log.error(e);
- return false;
+ result=true;
+
}
-
- }
- }
-
- SpyMessage getMessage(Selector selector)
- {
- synchronized (messages) {
-
- while (true) {
-
- try {
- if (messages.size()==0) return null;
-
- SpyMessage
mes=(SpyMessage)messages.removeFirst();
-
- if (mes.isOutdated()) {
- Log.log("SessionQueue: I dropped a
message (timeout)");
- continue;
- }
-
- if (selector!=null) {
- if (!selector.test(mes)) {
- Log.log("SessionQueue: I
dropped a message (selector)");
- continue;
- } else {
- Log.log("SessionQueue:
selector evaluates TRUE");
- }
- }
-
- //the SAME Message object is put in different
SessionQueues
- //when we deliver it, we have to clone() it to
insure independance
- SpyMessage message=mes.myClone();
-
- if (!transacted) {
- if
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
-
- synchronized
(messagesWaitingForAck) {
- //Put the message in
the messagesWaitForAck queue
-
messagesWaitingForAck.addLast(message);
- }
-
- message.setSessionQueue(this);
-
- } else if
(acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
- //DUPS_OK_ACKNOWLEDGE
- } else {
- //AUTO_ACKNOWLEDGE
- //we don't need to keep this
message in a queue
- }
- } else {
-
- //We are linked to a transacted
session
-
- synchronized (messagesWaitingForAck) {
- //Put the message in the
messagesWaitForAck queue
-
messagesWaitingForAck.addLast(message);
- }
-
- }
-
- return message;
-
- } catch (Exception e) {
- Log.error(e);
- }
-
- }
-
+
}
+
+ return result;
}
-
//A message has been acknowledged
void acknowledge(SpyMessage mes)
@@ -194,26 +98,41 @@
}
+ //notify the sleeping synchronous listeners
+ void close() throws JMSException
+ {
+ Iterator i=subscribers.iterator();
+
+ while (i.hasNext()) {
+ SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
+ consumer.close();
+ }
+
+ }
+
//the session is about to recover
void recover() throws JMSException
{
- synchronized (messages) {
- synchronized (messagesWaitingForAck) {
+ synchronized (messagesWaitingForAck) {
- while (messagesWaitingForAck.size()!=0) {
-
- //Get the most recent unacknowledged message
- SpyMessage
mes=(SpyMessage)messagesWaitingForAck.removeLast();
+ while (messagesWaitingForAck.size()!=0) {
- //This message is redelivered
- mes.setJMSRedelivered(true);
+ //Get the most recent unacknowledged message
+ SpyMessage
mes=(SpyMessage)messagesWaitingForAck.removeLast();
- //Put it in the incoming queue
- messages.addFirst(mes);
+ //This message is redelivered
+ mes.setJMSRedelivered(true);
+ //Put the message in one incoming queue - Is it what
the spec says ?
+ Iterator i=subscribers.iterator();
+
+ if (i.hasNext()) {
+ SpyMessageConsumer
consumer=(SpyMessageConsumer)i.next();
+ consumer.addMessage(mes);
}
-
+
}
+
}
}
@@ -225,5 +144,25 @@
}
}
+ void notifyReceiverStopped(boolean newMode)
+ {
+ //For Queues. Not yet implemented -
+ }
+
+ synchronized void addConsumer(SpyMessageConsumer consumer)
+ {
+ consumer.setSessionQueue(this);
+ HashSet newSet=(HashSet)subscribers.clone();
+ newSet.add(consumer);
+ subscribers=newSet;
+ }
+ synchronized boolean removeConsumer(MessageConsumer consumer)
+ {
+ HashSet newSet=(HashSet)subscribers.clone();
+ newSet.remove(consumer);
+ subscribers=newSet;
+ return subscribers.size()==0;
+ }
+
}
1.12 +128 -7 spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- SpyMessageConsumer.java 2000/05/20 02:34:00 1.11
+++ SpyMessageConsumer.java 2000/05/24 19:17:18 1.12
@@ -10,16 +10,17 @@
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Message;
+import javax.jms.Session;
import java.util.LinkedList;
import java.util.Date;
import org.spydermq.selectors.Selector;
/**
- * This class implements javax.jms.MessageConsumer - Going to be deprecated
+ * This class implements javax.jms.MessageConsumer
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -29,13 +30,19 @@
//Link to my session
protected SpySession session;
//My message listener (null if none)
- public MessageListener messageListener;
+ MessageListener messageListener;
//Am I closed ?
protected boolean closed;
//Do I have a selector
public Selector selector;
//The message selector
public String messageSelector;
+ //A link to my session queue (in my session)
+ protected SessionQueue sessionQueue;
+ //List of Pending messages (not yet delivered)
+ LinkedList messages;
+ //Is the consumer sleeping in a receive() ?
+ boolean waitInReceive;
// Constructor ---------------------------------------------------
@@ -46,8 +53,15 @@
closed=false;
selector=null;
messageSelector=null;
+ messages=new LinkedList();
+ waitInReceive=false;
}
+ void setSessionQueue(SessionQueue sessionQueue)
+ {
+ this.sessionQueue=sessionQueue;
+ }
+
// Public --------------------------------------------------------
public String getMessageSelector() throws JMSException
@@ -67,29 +81,136 @@
public void setMessageListener(MessageListener listener) throws JMSException
{
//Job is done in the inherited classes
+ //The QueueReceiver object need to notify their session / connection /
the broker
+ throw new RuntimeException("pure virtual call");
}
public Message receive() throws JMSException
{
//Job is done in the inherited classes
- return null;
+ //The QueueReceiver object need to notify their session / connection /
the broker
+ throw new RuntimeException("pure virtual call");
}
public Message receive(long timeOut) throws JMSException
{
//Job is done in the inherited classes
- return null;
+ //The QueueReceiver object need to notify their session / connection /
the broker
+ throw new RuntimeException("pure virtual call");
}
public Message receiveNoWait() throws JMSException
{
//Job is done in the inherited classes
- return null;
+ //The QueueReceiver object need to notify their session / connection /
the broker
+ throw new RuntimeException("pure virtual call");
}
public synchronized void close() throws JMSException
+ {
+ //Job is done in the inherited classes
+ //The QueueReceiver object need to notify their session / connection /
the broker
+ throw new RuntimeException("pure virtual call");
+ }
+
+ //Package protected - Not part of the spec
+
+ void setSelector(Selector selector,String messageSelector)
+ {
+ this.selector=selector;
+ this.messageSelector=messageSelector;
+ }
+
+ SpyMessage getMessage()
+ {
+ synchronized (messages) {
+
+ while (true) {
+
+ try {
+ if (messages.size()==0) return null;
+
+ SpyMessage
mes=(SpyMessage)messages.removeFirst();
+
+ if (mes.isOutdated()) {
+ Log.log("SessionQueue: I dropped a
message (timeout)");
+ continue;
+ }
+
+ if (selector!=null) {
+ if (!selector.test(mes)) {
+ Log.log("SessionQueue: I
dropped a message (selector)");
+ continue;
+ } else {
+ Log.log("SessionQueue:
selector evaluates TRUE");
+ }
+ }
+
+ //the SAME Message object is put in different
SessionQueues
+ //when we deliver it, we have to clone() it to
insure independance
+ SpyMessage message=mes.myClone();
+
+ if (!session.transacted) {
+ if
(session.acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
+
+ synchronized
(sessionQueue.messagesWaitingForAck) {
+ //Put the message in
the messagesWaitForAck queue
+
sessionQueue.messagesWaitingForAck.addLast(message);
+ }
+
+
message.setSessionQueue(sessionQueue);
+
+ } else if
(sessionQueue.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
+ //DUPS_OK_ACKNOWLEDGE
+ } else {
+ //AUTO_ACKNOWLEDGE
+ //we don't need to keep this
message in a queue
+ }
+ } else {
+
+ //We are linked to a transacted
session
+
+ synchronized
(sessionQueue.messagesWaitingForAck) {
+ //Put the message in the
messagesWaitForAck queue
+
sessionQueue.messagesWaitingForAck.addLast(message);
+ }
+
+ }
+
+ return message;
+
+ } catch (Exception e) {
+ Log.error(e);
+ }
+
+ }
+
+ }
+
+ }
+
+ void addMessage(SpyMessage mes) throws JMSException
{
- //Job is done in the inherited classes
+ synchronized (messages) {
+ //Add a message to the queue
+
+ //Test the priority
+ int pri=mes.getJMSPriority();
+
+ if (pri<=4) {
+ //normal priority message
+ messages.addLast(mes);
+ } else {
+ //expedited priority message
+ int size=messages.size();
+ int i=0;
+ for(;i<size;i++) {
+ if
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
+ }
+ messages.add(i,mes);
+ }
+
+ }
}
}
1.3 +5 -5 spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueReceiver.java 2000/05/24 02:56:41 1.2
+++ SpyQueueReceiver.java 2000/05/24 19:17:18 1.3
@@ -17,7 +17,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -32,7 +32,7 @@
// Constructor ---------------------------------------------------
- SpyQueueReceiver(SpyQueueSession s,SessionQueue sq,Queue q,boolean stop)
+ SpyQueueReceiver(SpyQueueSession s,Queue q,boolean stop)
{
super(s);
queue=q;
@@ -93,11 +93,11 @@
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-/* if (listener==null) {
- mySessionQueue.notifyReceiverStopped(false);
+ if (listener==null) {
+ sessionQueue.notifyReceiverStopped(false);
} else {
- mySessionQueue.notifyReceiverStopped(true);
- }*/
+ sessionQueue.notifyReceiverStopped(true);
+ }
messageListener=listener;
1.8 +5 -15 spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- SpyQueueSession.java 2000/05/24 02:56:41 1.7
+++ SpyQueueSession.java 2000/05/24 19:17:18 1.8
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class SpyQueueSession
extends SpySession
@@ -65,8 +65,10 @@
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
-
- SpyQueueReceiver receiver=new
SpyQueueReceiver(this,null,queue,modeStop);
+
+ SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,modeStop);
+ SessionQueue sessionQueue=addConsumer(queue,receiver);
+
return receiver;
}
@@ -144,19 +146,7 @@
{
thread.notify();
}
- }
-
- void addConsumer(Destination dest, SessionQueue who) throws JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
-
- //Not implemented yet
- }
-
- void removeConsumer(Destination dest, SessionQueue who) throws JMSException
- {
- //Not implemented yet
- }
+ }
//One receiver is changing its mode
synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean
newMode)
1.16 +89 -67 spyderMQ/src/java/org/spyderMQ/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- SpySession.java 2000/05/22 17:32:04 1.15
+++ SpySession.java 2000/05/24 19:17:18 1.16
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.15 $
+ * @version $Revision: 1.16 $
*/
public class SpySession
implements Runnable, Session
@@ -37,21 +37,21 @@
// Attributes ----------------------------------------------------
//Is this session transacted ?
- protected boolean transacted=false;
+ protected boolean transacted;
//What is the type of acknowledgement ?
- protected int acknowledgeMode=AUTO_ACKNOWLEDGE;
+ protected int acknowledgeMode;
//The messageListener for this session
- private MessageListener messageListener=null;
+ private MessageListener messageListener;
//The connection object to which this session is linked
protected SpyConnection connection;
- //HashMap of (HashSet of SessionQueue) by Destination
- HashMap subscribers;
+ //HashMap of SessionQueue by Destination
+ HashMap destinations;
//The outgoing message queue
protected LinkedList outgoingQueue;
//The outgoing message queue for messages that have been commited (if the
session is transacted)
protected LinkedList outgoingCommitedQueue;
//Is my connection in stopped mode ?
- protected boolean modeStop=false;
+ protected boolean modeStop;
//Is the session closed ?
boolean closed;
//This object is the object used to synchronize the session's thread
@@ -66,10 +66,11 @@
connection=conn;
transacted=trans;
acknowledgeMode=acknowledge;
- subscribers=new HashMap();
+ destinations=new HashMap();
outgoingQueue=new LinkedList();
outgoingCommitedQueue=new LinkedList();
modeStop=stop;
+ messageListener=null;
closed=false;
thread=new Integer(0);
this.isTopic=isTopic;
@@ -212,20 +213,16 @@
if (!modeStop) {
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
-
doneJob=doneJob||sessionQueue.deliverMessage();
- }
- }
-
+ Collection values = destinations.values();
+ Iterator i=values.iterator();
+ while (i.hasNext()) {
+ SessionQueue
sessionQueue=(SessionQueue)i.next();
+
doneJob=doneJob||sessionQueue.deliverMessage();
+ }
+
}
- //If there were smthg to do, try again
+ //If there was smthg to do, try again
if (doneJob&&!modeStop) continue;
try {
@@ -234,12 +231,11 @@
Log.log("SessionThread: I wake up");
} catch (InterruptedException e) {
}
-
+
}
}
}
- //CHECK for queues !
public synchronized void close() throws JMSException
{
if (closed) return;
@@ -253,16 +249,11 @@
//notify the sleeping synchronous listeners
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue sessionQueue=(SessionQueue)i2.next();
- //close() each SessionQueue (MessageConsumer) linked
to this Session
- sessionQueue.destination.close();
- }
+ Collection values = destinations.values();
+ Iterator i=values.iterator();
+ while (i.hasNext()) {
+ SessionQueue sessionQueue=(SessionQueue)i.next();
+ sessionQueue.close();
}
connection.sessionClosing(this);
@@ -272,6 +263,7 @@
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
//The job is done in inherited classes
+ throw new RuntimeException("pure virtual call");
}
//Commit a transacted session
@@ -293,15 +285,11 @@
outgoingQueue.clear();
//Notify each SessionQueue that we are going to commit
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
- sessionQueue.commit();
- }
+ Collection values = destinations.values();
+ Iterator i=values.iterator();
+ while (i.hasNext()) {
+ SessionQueue sessionQueue=(SessionQueue)i.next();
+ sessionQueue.commit();
}
//We have finished our work, we can wake up the thread
@@ -329,15 +317,11 @@
outgoingQueue.clear();
//Notify each SessionQueue that we are going to rollback
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
- sessionQueue.recover();
- }
+ Collection values = destinations.values();
+ Iterator i=values.iterator();
+ while (i.hasNext()) {
+ SessionQueue sessionQueue=(SessionQueue)i.next();
+ sessionQueue.recover();
}
//We have finished our work, we can wake up the thread
@@ -360,15 +344,11 @@
synchronized (thread) {
//Notify each SessionQueue that we are going to recover
- Collection values = subscribers.values();
- Iterator i1=values.iterator();
- while (i1.hasNext()) {
- HashSet set=(HashSet)i1.next();
- Iterator i2=set.iterator();
- while (i2.hasNext()) {
- SessionQueue
sessionQueue=(SessionQueue)i2.next();
- sessionQueue.recover();
- }
+ Collection values = destinations.values();
+ Iterator i=values.iterator();
+ while (i.hasNext()) {
+ SessionQueue sessionQueue=(SessionQueue)i.next();
+ sessionQueue.recover();
}
//We have finished our work, we can wake up the thread
@@ -384,10 +364,10 @@
Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
//Remove it from the subscribers list
- synchronized (subscribers) {
- HashMap newMap=(HashMap)subscribers.clone();
+ synchronized (destinations) {
+ HashMap newMap=(HashMap)destinations.clone();
newMap.remove(dest);
- subscribers=newMap;
+ destinations=newMap;
}
//We could look at our incoming and outgoing queues to drop messages
@@ -395,11 +375,52 @@
// Package protected ---------------------------------------------
- void removeConsumer(Destination d, SessionQueue who) throws JMSException
+ SessionQueue addConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
{
- //The job is done in the inherited classes
- }
+ if (closed) throw new IllegalStateException("The session is closed");
+
+ Log.log("Session:
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
+ synchronized (destinations) {
+ SessionQueue sub=(SessionQueue)destinations.get(dest);
+ if (sub==null) {
+ sub=new SessionQueue(transacted,acknowledgeMode);
+ sub.addConsumer(who);
+ HashMap newDestinations=(HashMap)destinations.clone();
+ newDestinations.put(dest,sub);
+ destinations=newDestinations;
+ connection.addSession(dest,this);
+ } else {
+ sub.addConsumer(who);
+ }
+ return sub;
+ }
+ }
+
+ void removeConsumer(Destination dest, SpyMessageConsumer who) throws
JMSException
+ {
+ Log.log("Session:
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
+
+ synchronized (destinations) {
+ SessionQueue sub=(SessionQueue)destinations.get(dest);
+ if (sub!=null) {
+ boolean empty=sub.removeConsumer(who);
+ if (empty) {
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.remove(dest);
+ destinations=newDestinations;
+ connection.removeSession(dest,this);
+ }
+ } else {
+ //this should not happen
+ HashMap newDestinations=(HashMap)destinations.clone();
+ newDestinations.remove(dest);
+ destinations=newDestinations;
+ }
+ }
+
+ }
+
String getNewMessageID() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -407,10 +428,11 @@
return connection.getNewMessageID();
}
- //The connection a changed its mode (stop() or start())
+ //The connection has changed its mode (stop() or start())
//We have to wait until message delivery has stopped or wake up the thread
void notifyStopMode(boolean newValue)
{
+
if (closed) throw new IllegalStateException("The session is closed");
if (modeStop==newValue) return;
@@ -422,7 +444,7 @@
synchronized (thread) {
;
}
-
+
} else {
//Wake up the thread
1.24 +11 -63 spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -r1.23 -r1.24
--- SpyTopicSession.java 2000/05/22 17:32:04 1.23
+++ SpyTopicSession.java 2000/05/24 19:17:18 1.24
@@ -24,7 +24,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.23 $
+ * @version $Revision: 1.24 $
*/
public class SpyTopicSession
extends SpySession
@@ -58,10 +58,8 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- SessionQueue sessionQueue=new SessionQueue(transacted,acknowledgeMode);
- SpyTopicSubscriber sub=new
SpyTopicSubscriber(this,sessionQueue,topic,noLocal);
- sessionQueue.setConsumer(sub);
- addConsumer(topic,sessionQueue);
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
+ SessionQueue sessionQueue=addConsumer(topic,sub);
if (messageSelector!=null) {
Selector selector=new Selector(messageSelector);
@@ -105,11 +103,10 @@
//Not implemented yet
}
- //overides SpySession
+ // - Package protected ---------------------------------------------
+ // - Not part of the spec
- //Not part of the spec
-
//Called by the ConnectionReceiver object : put a new msg in the receiver's
queue
public void dispatchMessage(Destination dest,SpyMessage mes) throws
JMSException
{
@@ -118,22 +115,20 @@
Log.log("Session:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
if (mes.isOutdated()) return;
-
- //Get the set of SessionQueue (MessageConsumers) for this Destination
- HashSet set=(HashSet)subscribers.get(dest);
- if (set==null||set.isEmpty()) return;
- //Work on the set of consumers for this topic
+ //Get the SessionQueue for this Destination
+ SessionQueue sessionQueue=(SessionQueue)destinations.get(dest);
+ if (sessionQueue==null) return;
- Iterator i=set.iterator();
+ //Work on the set of SpyTopicSubscriber for this topic
+ Iterator i=sessionQueue.subscribers.iterator();
while (i.hasNext()) {
- SessionQueue sub=(SessionQueue)i.next();
+ SpyTopicSubscriber sub=(SpyTopicSubscriber)i.next();
sub.addMessage(mes);
}
}
- // Package protected ---------------------------------------------
//called by a MessageProducer object which needs to publish a message
void sendMessage(SpyMessage m) throws JMSException
@@ -176,52 +171,5 @@
}
- void addConsumer(Destination dest, SessionQueue who) throws JMSException
- {
- if (closed) throw new IllegalStateException("The session is closed");
-
- Log.log("TopicSession:
subscribe(dest="+dest.toString()+",QueueSession="+who.toString()+")");
-
- synchronized (subscribers) {
- HashSet sub=(HashSet)subscribers.get(dest);
- if (sub==null) {
- sub=new HashSet();
- sub.add(who);
- HashMap newSubscribers=(HashMap)subscribers.clone();
- newSubscribers.put(dest,sub);
- subscribers=newSubscribers;
- connection.addSession(dest,this);
- } else {
- HashSet newSub=(HashSet)sub.clone();
- newSub.add(who);
- subscribers.put(dest,newSub);
- }
- }
- }
-
- void removeConsumer(Destination dest, SessionQueue who) throws JMSException
- {
- Log.log("TopicSession:
removeConsumer(Destination="+dest.toString()+",QueueSession="+who.toString()+")");
-
- synchronized (subscribers) {
- HashSet sub=(HashSet)subscribers.get(dest);
- if (sub!=null) {
- HashSet newSub=(HashSet)sub.clone();
- newSub.remove(who);
- if (newSub.isEmpty()) {
- HashMap
newSubscribers=(HashMap)subscribers.clone();
- newSubscribers.remove(dest);
- subscribers=newSubscribers;
- connection.removeSession(dest,this);
- } else subscribers.put(dest,newSub);
- } else {
- //this should not happen
- HashMap newSubscribers=(HashMap)subscribers.clone();
- newSubscribers.remove(dest);
- subscribers=newSubscribers;
- }
- }
-
- }
}
1.10 +27 -39 spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- SpyTopicSubscriber.java 2000/05/20 02:34:00 1.9
+++ SpyTopicSubscriber.java 2000/05/24 19:17:18 1.10
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -30,18 +30,15 @@
//The topic I registered
private Topic topic;
- //A link to my session queue (in my session)
- private SessionQueue mySessionQueue;
//Am I in local mode ?
boolean local;
// Constructor ---------------------------------------------------
- SpyTopicSubscriber(SpyTopicSession s,SessionQueue sq,Topic t,boolean local)
+ SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local)
{
- super(s);
- topic=t;
- mySessionQueue=sq;
+ super(session);
+ this.topic=topic;
this.local=local;
}
@@ -49,44 +46,43 @@
public Topic getTopic() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return topic;
}
public boolean getNoLocal() throws JMSException
{
- if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
-
+ if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
return local;
}
+ //Overrides MessageConsumer
+
public void close() throws JMSException
{
if (closed) return;
closed=true;
- session.removeConsumer(topic,mySessionQueue);
+ session.removeConsumer(topic,this);
- if (mySessionQueue.waitInReceive&&messageListener==null) {
+ if (waitInReceive&&messageListener==null) {
//A consumer could be waiting in receive()
- synchronized (mySessionQueue.messages) {
- mySessionQueue.messages.notify();
+ synchronized (messages) {
+ messages.notify();
}
+
}
}
-
- //Overrides MessageConsumer
-
+
public Message receive() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- synchronized (mySessionQueue.messages) {
+ synchronized (messages) {
//if the client follows the specification [4.4.6], he cannot
use this session
- //to asynchronously receive a message or receive() from
another thread.
+ //to asynchronously receive a message or receive() in another
thread.
//If a message is already pending for this session, we can
immediatly deliver it
while (true) {
@@ -94,16 +90,16 @@
if (closed) return null;
if (!session.modeStop) {
- Message
mes=mySessionQueue.getMessage(selector);
+ Message mes=getMessage();
if (mes!=null) return mes;
} else Log.log("the connection is stopped !");
try {
- mySessionQueue.waitInReceive=true;
- mySessionQueue.messages.wait();
+ waitInReceive=true;
+ messages.wait();
} catch (InterruptedException e) {
} finally {
- mySessionQueue.waitInReceive=false;
+ waitInReceive=false;
}
}
@@ -118,7 +114,7 @@
long endTime=(new Date()).getTime()+timeOut;
- synchronized (mySessionQueue.messages) {
+ synchronized (messages) {
//if the client respects the specification [4.4.6], he cannot
use this session
//to asynchronously receive a message or receive() from
another thread.
@@ -129,7 +125,7 @@
if (closed) return null;
if (!session.modeStop) {
- Message
mes=mySessionQueue.getMessage(selector);
+ Message mes=getMessage();
if (mes!=null) return mes;
} else Log.log("the connection is stopped !");
@@ -137,11 +133,11 @@
if (att<=0) return null;
try {
- mySessionQueue.waitInReceive=true;
- mySessionQueue.messages.wait(att);
+ waitInReceive=true;
+ messages.wait(att);
} catch (InterruptedException e) {
} finally {
- mySessionQueue.waitInReceive=false;
+ waitInReceive=false;
}
}
@@ -153,11 +149,11 @@
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- synchronized (mySessionQueue.messages) {
+ synchronized (messages) {
while (true) {
if (session.modeStop) return null;
- return mySessionQueue.getMessage(selector);
+ return getMessage();
}
}
@@ -175,12 +171,4 @@
}
}
- // ----- Debug only ----- [not public part of the spec]
-
- public void setSelector(Selector selector,String messageSelector)
- {
- this.selector=selector;
- this.messageSelector=messageSelector;
- }
-
}