User: norbert 
  Date: 00/05/24 12:17:19

  Modified:    src/java/org/spyderMQ SessionQueue.java
                        SpyMessageConsumer.java SpyQueueReceiver.java
                        SpyQueueSession.java SpySession.java
                        SpyTopicSession.java SpyTopicSubscriber.java
  Log:
  Changes to the Pub/Sub System in order to implement P2P.
  Now, a SessionQueue is specific to an unique destination.
  This is a way to code 'local optimisations' too
  Added bug (I think) in acknowledge()
  
  Selectors :
  Remove the .getClass().getName() [ thx Rich ]
  
  Revision  Changes    Path
  1.15      +74 -135   spyderMQ/src/java/org/spyderMQ/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- SessionQueue.java 2000/05/18 20:20:57     1.14
  +++ SessionQueue.java 2000/05/24 19:17:18     1.15
  @@ -11,6 +11,8 @@
   import javax.jms.JMSException;
   import javax.jms.Session;
   import java.util.LinkedList;
  +import java.util.HashSet;
  +import java.util.Iterator;
   import org.spydermq.selectors.Selector;
   
   /**
  @@ -18,163 +20,65 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.14 $
  + *   @version $Revision: 1.15 $
    */
   public class SessionQueue
   {
        // Attributes ----------------------------------------------------
   
  -     //the MessageConsumer of this queue
  -     SpyMessageConsumer destination;
  -     //List of Pending messages (not yet delivered)
  -     LinkedList messages;
        //List of messages waiting for acknoledgment
        LinkedList messagesWaitingForAck;
  -     //Is the consumer sleeping in a receive() ?
  -     boolean waitInReceive;
        //Is the session transacted ?
        boolean transacted;
        //What is the message acknowledgment mode ?
        int acknowledgeMode;
  +     //the MessageConsumers linked to this queue
  +     HashSet subscribers;
   
        // Constructor ---------------------------------------------------
           
  -     SessionQueue(boolean tr,int am)
  +     SessionQueue(boolean transacted,int acknowledgeMode)
        {
  -             messages=new LinkedList();
                messagesWaitingForAck=new LinkedList();
  -             waitInReceive=false;
  -             transacted=tr;
  -             acknowledgeMode=am;             
  +             subscribers=new HashSet();
  +             this.transacted=transacted;
  +             this.acknowledgeMode=acknowledgeMode;           
        }
   
        // Package protected ---------------------------------------------
            
  -     void setConsumer(SpyMessageConsumer consumer)
  -     {
  -             destination=consumer;
  -     }
        
  -     void addMessage(SpyMessage mes) throws JMSException
  -     {
  -             synchronized (messages) {
  -                     //Add a message to the queue
  -                     
  -                     //Test the priority
  -                     int pri=mes.getJMSPriority();
  -                     
  -                     if (pri<=4) {                           
  -                             //normal priority message
  -                             messages.addLast(mes);
  -                     } else {                                
  -                             //expedited priority message
  -                             int size=messages.size();
  -                             int i=0;                                
  -                             for(;i<size;i++) {
  -                                     if 
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
  -                             }                               
  -                             messages.add(i,mes);                            
  -                     }
  -                     
  -             }
  -     }
  -     
        //Send a message from the queue to a MessageConsumer
        boolean deliverMessage()
        {
  -             synchronized (messages) {
  +             boolean result=false;
  +             Iterator i=subscribers.iterator();
  +             
  +             while (i.hasNext()) {
                        
  -                     try {
  -                             
  -                             if (messages.size()==0) return false;
  +                     SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
  +
  +                     synchronized (consumer.messages) {
                                
  -                             MessageListener 
listener=destination.getMessageListener();
  -                     
  -                             if (listener==null) {
  -                                     if (!waitInReceive) return false;              
                 
  -                                     messages.notify();
  +                             if (consumer.messages.size()==0) continue;
  +                                     
  +                             if (consumer.messageListener==null) {
  +                                     if (!consumer.waitInReceive) continue;
  +                                     consumer.messages.notify();
                                } else {
  -                                     SpyMessage 
mes=getMessage(destination.selector);
  +                                     SpyMessage mes=consumer.getMessage();
                                        if (mes==null) return false;
  -                                     listener.onMessage(mes);
  +                                     consumer.messageListener.onMessage(mes);
                                }
  -                             return true;
                                
  -                     } catch (JMSException e) {
  -                             Log.error(e);
  -                             return false;
  +                             result=true;                                    
  +                                                                     
                        }
  -                     
  -             }
  -     }
  -     
  -     SpyMessage getMessage(Selector selector)
  -     {
  -             synchronized (messages) {
  -                             
  -                     while (true) {
  -
  -                             try {
  -                                     if (messages.size()==0) return null;
  -                             
  -                                     SpyMessage 
mes=(SpyMessage)messages.removeFirst();
  -                             
  -                                     if (mes.isOutdated()) {
  -                                             Log.log("SessionQueue: I dropped a 
message (timeout)");
  -                                             continue;
  -                                     }
  -                                             
  -                                     if (selector!=null) {
  -                                             if (!selector.test(mes)) {
  -                                                     Log.log("SessionQueue: I 
dropped a message (selector)");
  -                                                     continue;
  -                                             } else {
  -                                                     Log.log("SessionQueue: 
selector evaluates TRUE");
  -                                             }
  -                                     }
  -                                             
  -                                     //the SAME Message object is put in different 
SessionQueues
  -                                     //when we deliver it, we have to clone() it to 
insure independance
  -                                     SpyMessage message=mes.myClone();
  -                                                                                    
                                                         
  -                                     if (!transacted) {
  -                                             if 
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
  -                                                             
  -                                                     synchronized 
(messagesWaitingForAck) {
  -                                                             //Put the message in 
the messagesWaitForAck queue
  -                                                             
messagesWaitingForAck.addLast(message);
  -                                                     }
  -                                                             
  -                                                     message.setSessionQueue(this);
  -                                                             
  -                                             } else if 
(acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
  -                                                     //DUPS_OK_ACKNOWLEDGE
  -                                             } else {
  -                                                     //AUTO_ACKNOWLEDGE 
  -                                                     //we don't need to keep this 
message in a queue
  -                                             }
  -                                     } else {
  -                                                     
  -                                             //We are linked to a transacted 
session                                                                                
 
  -                                                     
  -                                             synchronized (messagesWaitingForAck) {
  -                                                     //Put the message in the 
messagesWaitForAck queue
  -                                                     
messagesWaitingForAck.addLast(message);
  -                                             }
  -                                                     
  -                                     }
  -                                             
  -                                     return message;
  -                                                                             
  -                             } catch (Exception e) {
  -                                     Log.error(e);
  -                             }
  -
  -                     }
  -                     
  +             
                }
  +             
  +             return result;
        }
  -
        
        //A message has been acknowledged
        void acknowledge(SpyMessage mes)
  @@ -194,26 +98,41 @@
                
        }
        
  +     //notify the sleeping synchronous listeners
  +     void close() throws JMSException 
  +     {
  +             Iterator i=subscribers.iterator();
  +             
  +             while (i.hasNext()) {                   
  +                     SpyMessageConsumer consumer=(SpyMessageConsumer)i.next();
  +                     consumer.close();
  +             }
  +             
  +     }
  +
        //the session is about to recover
        void recover() throws JMSException
        {
  -             synchronized (messages) {
  -                     synchronized (messagesWaitingForAck) {
  +             synchronized (messagesWaitingForAck) {
                                
  -                             while (messagesWaitingForAck.size()!=0) {
  -                                     
  -                                     //Get the most recent unacknowledged message
  -                                     SpyMessage 
mes=(SpyMessage)messagesWaitingForAck.removeLast();
  +                     while (messagesWaitingForAck.size()!=0) {
                                        
  -                                     //This message is redelivered                  
                 
  -                                     mes.setJMSRedelivered(true);
  +                             //Get the most recent unacknowledged message
  +                             SpyMessage 
mes=(SpyMessage)messagesWaitingForAck.removeLast();
                                        
  -                                     //Put it in the incoming queue
  -                                     messages.addFirst(mes);
  +                             //This message is redelivered                          
         
  +                             mes.setJMSRedelivered(true);
                                        
  +                             //Put the message in one incoming queue - Is it what 
the spec says ?
  +                             Iterator i=subscribers.iterator();
  +             
  +                             if (i.hasNext()) {                      
  +                                     SpyMessageConsumer 
consumer=(SpyMessageConsumer)i.next();
  +                                     consumer.addMessage(mes);
                                }
  -                             
  +                                     
                        }
  +                             
                }
        }
   
  @@ -225,5 +144,25 @@
                }
        }       
        
  +     void notifyReceiverStopped(boolean newMode)
  +     {
  +             //For Queues. Not yet implemented -
  +     }
  +     
  +     synchronized void addConsumer(SpyMessageConsumer consumer)
  +     {
  +             consumer.setSessionQueue(this);
  +             HashSet newSet=(HashSet)subscribers.clone();
  +             newSet.add(consumer);
  +             subscribers=newSet;
  +     }
   
  +     synchronized boolean removeConsumer(MessageConsumer consumer)
  +     {
  +             HashSet newSet=(HashSet)subscribers.clone();
  +             newSet.remove(consumer);
  +             subscribers=newSet;
  +             return subscribers.size()==0;
  +     }
  +     
   }
  
  
  
  1.12      +128 -7    spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpyMessageConsumer.java   2000/05/20 02:34:00     1.11
  +++ SpyMessageConsumer.java   2000/05/24 19:17:18     1.12
  @@ -10,16 +10,17 @@
   import javax.jms.JMSException;
   import javax.jms.MessageListener;
   import javax.jms.Message;
  +import javax.jms.Session;
   import java.util.LinkedList;
   import java.util.Date;
   import org.spydermq.selectors.Selector;
   
   /**
  - *   This class implements javax.jms.MessageConsumer - Going to be deprecated
  + *   This class implements javax.jms.MessageConsumer
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpyMessageConsumer 
        implements MessageConsumer
  @@ -29,13 +30,19 @@
        //Link to my session
        protected SpySession session;
        //My message listener (null if none)
  -     public MessageListener messageListener;
  +     MessageListener messageListener;
        //Am I closed ?
        protected boolean closed;
        //Do I have a selector
        public Selector selector;
        //The message selector
        public String messageSelector;
  +     //A link to my session queue (in my session)
  +     protected SessionQueue sessionQueue;
  +     //List of Pending messages (not yet delivered)
  +     LinkedList messages;
  +     //Is the consumer sleeping in a receive() ?
  +     boolean waitInReceive;
        
        // Constructor ---------------------------------------------------
           
  @@ -46,8 +53,15 @@
                closed=false;
                selector=null;
                messageSelector=null;
  +             messages=new LinkedList();
  +             waitInReceive=false;
        }
        
  +     void setSessionQueue(SessionQueue sessionQueue)
  +     {
  +             this.sessionQueue=sessionQueue;
  +     }               
  +     
        // Public --------------------------------------------------------
   
       public String getMessageSelector() throws JMSException
  @@ -67,29 +81,136 @@
       public void setMessageListener(MessageListener listener) throws JMSException
        {       
                //Job is done in the inherited classes
  +             //The QueueReceiver object need to notify their session / connection / 
the broker
  +             throw new RuntimeException("pure virtual call");
        }
   
       public Message receive() throws JMSException
        {
                //Job is done in the inherited classes
  -             return null;
  +             //The QueueReceiver object need to notify their session / connection / 
the broker
  +             throw new RuntimeException("pure virtual call");
        }
   
       public Message receive(long timeOut) throws JMSException
        {
                //Job is done in the inherited classes
  -             return null;
  +             //The QueueReceiver object need to notify their session / connection / 
the broker
  +             throw new RuntimeException("pure virtual call");
        }
   
       public Message receiveNoWait() throws JMSException
        {
                //Job is done in the inherited classes
  -             return null;
  +             //The QueueReceiver object need to notify their session / connection / 
the broker
  +             throw new RuntimeException("pure virtual call");
        }
   
       public synchronized void close() throws JMSException
  +     {
  +             //Job is done in the inherited classes
  +             //The QueueReceiver object need to notify their session / connection / 
the broker
  +             throw new RuntimeException("pure virtual call");
  +     }
  +     
  +     //Package protected - Not part of the spec
  +     
  +     void setSelector(Selector selector,String messageSelector)
  +     {
  +             this.selector=selector;
  +             this.messageSelector=messageSelector;
  +     }
  +     
  +     SpyMessage getMessage()
  +     {
  +             synchronized (messages) {
  +                             
  +                     while (true) {
  +
  +                             try {
  +                                     if (messages.size()==0) return null;
  +                             
  +                                     SpyMessage 
mes=(SpyMessage)messages.removeFirst();
  +                             
  +                                     if (mes.isOutdated()) {
  +                                             Log.log("SessionQueue: I dropped a 
message (timeout)");
  +                                             continue;
  +                                     }
  +                                             
  +                                     if (selector!=null) {
  +                                             if (!selector.test(mes)) {
  +                                                     Log.log("SessionQueue: I 
dropped a message (selector)");
  +                                                     continue;
  +                                             } else {
  +                                                     Log.log("SessionQueue: 
selector evaluates TRUE");
  +                                             }
  +                                     }
  +                                             
  +                                     //the SAME Message object is put in different 
SessionQueues
  +                                     //when we deliver it, we have to clone() it to 
insure independance
  +                                     SpyMessage message=mes.myClone();
  +                                                                                    
                                                         
  +                                     if (!session.transacted) {
  +                                             if 
(session.acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
  +                                                             
  +                                                     synchronized 
(sessionQueue.messagesWaitingForAck) {
  +                                                             //Put the message in 
the messagesWaitForAck queue
  +                                                             
sessionQueue.messagesWaitingForAck.addLast(message);
  +                                                     }
  +                                                             
  +                                                     
message.setSessionQueue(sessionQueue);
  +                                                             
  +                                             } else if 
(sessionQueue.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
  +                                                     //DUPS_OK_ACKNOWLEDGE
  +                                             } else {
  +                                                     //AUTO_ACKNOWLEDGE 
  +                                                     //we don't need to keep this 
message in a queue
  +                                             }
  +                                     } else {
  +                                                     
  +                                             //We are linked to a transacted 
session                                                                                
 
  +                                                     
  +                                             synchronized 
(sessionQueue.messagesWaitingForAck) {
  +                                                     //Put the message in the 
messagesWaitForAck queue
  +                                                     
sessionQueue.messagesWaitingForAck.addLast(message);
  +                                             }
  +                                                     
  +                                     }
  +                                             
  +                                     return message;
  +                                                                             
  +                             } catch (Exception e) {
  +                                     Log.error(e);
  +                             }
  +
  +                     }
  +                     
  +             }
  +
  +     }
  +     
  +     void addMessage(SpyMessage mes) throws JMSException
        {
  -             //Job is done in the inherited classes          
  +             synchronized (messages) {
  +                     //Add a message to the queue
  +                     
  +                     //Test the priority
  +                     int pri=mes.getJMSPriority();
  +                     
  +                     if (pri<=4) {                           
  +                             //normal priority message
  +                             messages.addLast(mes);
  +                     } else {                                
  +                             //expedited priority message
  +                             int size=messages.size();
  +                             int i=0;                                
  +                             for(;i<size;i++) {
  +                                     if 
(((SpyMessage)messages.get(i)).getJMSPriority()<pri) break;
  +                             }                               
  +                             messages.add(i,mes);                            
  +                     }
  +                     
  +             }
        }
        
   }
  
  
  
  1.3       +5 -5      spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueueReceiver.java     2000/05/24 02:56:41     1.2
  +++ SpyQueueReceiver.java     2000/05/24 19:17:18     1.3
  @@ -17,7 +17,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueueReceiver 
        extends SpyMessageConsumer 
  @@ -32,7 +32,7 @@
        
        // Constructor ---------------------------------------------------
           
  -    SpyQueueReceiver(SpyQueueSession s,SessionQueue sq,Queue q,boolean stop) 
  +    SpyQueueReceiver(SpyQueueSession s,Queue q,boolean stop) 
        {
                super(s);
                queue=q;
  @@ -93,11 +93,11 @@
        {       
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
                
  -/*           if (listener==null) {
  -                     mySessionQueue.notifyReceiverStopped(false);
  +             if (listener==null) {
  +                     sessionQueue.notifyReceiverStopped(false);
                } else {
  -                     mySessionQueue.notifyReceiverStopped(true);
  -             }*/
  +                     sessionQueue.notifyReceiverStopped(true);
  +             }
   
                messageListener=listener;
                
  
  
  
  1.8       +5 -15     spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyQueueSession.java      2000/05/24 02:56:41     1.7
  +++ SpyQueueSession.java      2000/05/24 19:17:18     1.8
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -65,8 +65,10 @@
       public QueueReceiver createReceiver(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  -             SpyQueueReceiver receiver=new 
SpyQueueReceiver(this,null,queue,modeStop);
  +
  +             SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,modeStop);
  +             SessionQueue sessionQueue=addConsumer(queue,receiver);
  +             
                return receiver;
        }
   
  @@ -144,19 +146,7 @@
                {
                        thread.notify();
                }
  -     }
  -     
  -     void addConsumer(Destination dest, SessionQueue who) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");
  -                                                                             
  -             //Not implemented yet
  -     }
  -
  -     void removeConsumer(Destination dest, SessionQueue who) throws JMSException
  -     {
  -             //Not implemented yet
  -     }
  +     }       
        
        //One receiver is changing its mode
        synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean 
newMode)
  
  
  
  1.16      +89 -67    spyderMQ/src/java/org/spyderMQ/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- SpySession.java   2000/05/22 17:32:04     1.15
  +++ SpySession.java   2000/05/24 19:17:18     1.16
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.15 $
  + *   @version $Revision: 1.16 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -37,21 +37,21 @@
        // Attributes ----------------------------------------------------
   
        //Is this session transacted ?
  -     protected boolean transacted=false;
  +     protected boolean transacted;
        //What is the type of acknowledgement ?
  -     protected int acknowledgeMode=AUTO_ACKNOWLEDGE;
  +     protected int acknowledgeMode;
        //The messageListener for this session
  -     private MessageListener messageListener=null;
  +     private MessageListener messageListener;
        //The connection object to which this session is linked
        protected SpyConnection connection;
  -     //HashMap of (HashSet of SessionQueue) by Destination
  -     HashMap subscribers;
  +     //HashMap of SessionQueue by Destination
  +     HashMap destinations;
        //The outgoing message queue 
        protected LinkedList outgoingQueue;
        //The outgoing message queue for messages that have been commited (if the 
session is transacted)
        protected LinkedList outgoingCommitedQueue;
        //Is my connection in stopped mode ?
  -     protected boolean modeStop=false;
  +     protected boolean modeStop;
        //Is the session closed ?
        boolean closed;
        //This object is the object used to synchronize the session's thread
  @@ -66,10 +66,11 @@
                connection=conn;
                transacted=trans;
                acknowledgeMode=acknowledge;
  -             subscribers=new HashMap();
  +             destinations=new HashMap();
                outgoingQueue=new LinkedList();
                outgoingCommitedQueue=new LinkedList();
                modeStop=stop;
  +             messageListener=null;
                closed=false;
                thread=new Integer(0);
                this.isTopic=isTopic;
  @@ -212,20 +213,16 @@
                                
                                if (!modeStop) {
                                                
  -                                     Collection values = subscribers.values();
  -                                     Iterator i1=values.iterator();
  -                                     while (i1.hasNext()) {
  -                                             HashSet set=(HashSet)i1.next();
  -                                             Iterator i2=set.iterator();
  -                                             while (i2.hasNext()) {
  -                                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                                     
doneJob=doneJob||sessionQueue.deliverMessage();
  -                                             }
  -                                     }       
  -                                                                                    
         
  +                                     Collection values = destinations.values();
  +                                     Iterator i=values.iterator();
  +                                     while (i.hasNext()) {
  +                                             SessionQueue 
sessionQueue=(SessionQueue)i.next();
  +                                             
doneJob=doneJob||sessionQueue.deliverMessage();
  +                                     }
  +                                     
                                }
                                        
  -                             //If there were smthg to do, try again
  +                             //If there was smthg to do, try again
                                if (doneJob&&!modeStop) continue;
                                        
                                try {
  @@ -234,12 +231,11 @@
                                        Log.log("SessionThread: I wake up");
                                } catch (InterruptedException e) {
                                }
  -                                     
  +
                        }
                }
        }
   
  -     //CHECK for queues !
        public synchronized void close() throws JMSException
        {
                if (closed) return;
  @@ -253,16 +249,11 @@
                
                //notify the sleeping synchronous listeners
                
  -             Collection values = subscribers.values();
  -             Iterator i1=values.iterator();
  -             while (i1.hasNext()) {
  -                     HashSet set=(HashSet)i1.next();
  -                     Iterator i2=set.iterator();
  -                     while (i2.hasNext()) {
  -                             SessionQueue sessionQueue=(SessionQueue)i2.next();
  -                             //close() each SessionQueue (MessageConsumer) linked 
to this Session
  -                             sessionQueue.destination.close();
  -                     }
  +             Collection values = destinations.values();
  +             Iterator i=values.iterator();
  +             while (i.hasNext()) {
  +                     SessionQueue sessionQueue=(SessionQueue)i.next();
  +                     sessionQueue.close();
                }       
                
                connection.sessionClosing(this);
  @@ -272,6 +263,7 @@
        public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
        {
                //The job is done in inherited classes
  +             throw new RuntimeException("pure virtual call");
        }       
        
        //Commit a transacted session
  @@ -293,15 +285,11 @@
                        outgoingQueue.clear();
                        
                        //Notify each SessionQueue that we are going to commit
  -                     Collection values = subscribers.values();
  -                     Iterator i1=values.iterator();
  -                     while (i1.hasNext()) {
  -                             HashSet set=(HashSet)i1.next();
  -                             Iterator i2=set.iterator();
  -                             while (i2.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                     sessionQueue.commit();
  -                             }
  +                     Collection values = destinations.values();
  +                     Iterator i=values.iterator();
  +                     while (i.hasNext()) {
  +                             SessionQueue sessionQueue=(SessionQueue)i.next();
  +                             sessionQueue.commit();
                        }       
                        
                        //We have finished our work, we can wake up the thread
  @@ -329,15 +317,11 @@
                        outgoingQueue.clear();
                        
                        //Notify each SessionQueue that we are going to rollback
  -                     Collection values = subscribers.values();
  -                     Iterator i1=values.iterator();
  -                     while (i1.hasNext()) {
  -                             HashSet set=(HashSet)i1.next();
  -                             Iterator i2=set.iterator();
  -                             while (i2.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                     sessionQueue.recover();
  -                             }
  +                     Collection values = destinations.values();
  +                     Iterator i=values.iterator();
  +                     while (i.hasNext()) {
  +                             SessionQueue sessionQueue=(SessionQueue)i.next();
  +                             sessionQueue.recover();
                        }       
                        
                        //We have finished our work, we can wake up the thread
  @@ -360,15 +344,11 @@
                synchronized (thread) {
                        
                        //Notify each SessionQueue that we are going to recover
  -                     Collection values = subscribers.values();
  -                     Iterator i1=values.iterator();
  -                     while (i1.hasNext()) {
  -                             HashSet set=(HashSet)i1.next();
  -                             Iterator i2=set.iterator();
  -                             while (i2.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                     sessionQueue.recover();
  -                             }
  +                     Collection values = destinations.values();
  +                     Iterator i=values.iterator();
  +                     while (i.hasNext()) {
  +                             SessionQueue sessionQueue=(SessionQueue)i.next();
  +                             sessionQueue.recover();
                        }       
                        
                        //We have finished our work, we can wake up the thread
  @@ -384,10 +364,10 @@
                Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
                
                //Remove it from the subscribers list
  -             synchronized (subscribers) {
  -                     HashMap newMap=(HashMap)subscribers.clone();    
  +             synchronized (destinations) {
  +                     HashMap newMap=(HashMap)destinations.clone();   
                        newMap.remove(dest);
  -                     subscribers=newMap;
  +                     destinations=newMap;
                }
                
                //We could look at our incoming and outgoing queues to drop messages
  @@ -395,11 +375,52 @@
        
        // Package protected ---------------------------------------------
            
  -     void removeConsumer(Destination d, SessionQueue who) throws JMSException
  +     SessionQueue addConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
        {
  -             //The job is done in the inherited classes
  -     }       
  +             if (closed) throw new IllegalStateException("The session is closed");
  +                                                                             
  +             Log.log("Session: 
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
                
  +             synchronized (destinations) {
  +                     SessionQueue sub=(SessionQueue)destinations.get(dest);
  +                     if (sub==null) {
  +                             sub=new SessionQueue(transacted,acknowledgeMode);
  +                             sub.addConsumer(who);
  +                             HashMap newDestinations=(HashMap)destinations.clone();
  +                             newDestinations.put(dest,sub);
  +                             destinations=newDestinations;
  +                             connection.addSession(dest,this);
  +                     } else {
  +                             sub.addConsumer(who);
  +                     }               
  +                     return sub;
  +             }
  +     }
  +
  +     void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
  +     {
  +             Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
  +             
  +             synchronized (destinations) {
  +                     SessionQueue sub=(SessionQueue)destinations.get(dest);
  +                     if (sub!=null) {                                               
                 
  +                             boolean empty=sub.removeConsumer(who);
  +                             if (empty) {
  +                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                     newDestinations.remove(dest);
  +                                     destinations=newDestinations;
  +                                     connection.removeSession(dest,this);
  +                             } 
  +                     } else {
  +                             //this should not happen
  +                             HashMap newDestinations=(HashMap)destinations.clone();
  +                             newDestinations.remove(dest);
  +                             destinations=newDestinations;
  +                     }
  +             }                       
  +             
  +     }
  +             
        String getNewMessageID() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  @@ -407,10 +428,11 @@
                return connection.getNewMessageID();
        }
        
  -     //The connection a changed its mode (stop() or start())
  +     //The connection has changed its mode (stop() or start())
        //We have to wait until message delivery has stopped or wake up the thread
        void notifyStopMode(boolean newValue)
        {
  +             
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
                if (modeStop==newValue) return; 
                
  @@ -422,7 +444,7 @@
                        synchronized (thread) {
                                ;
                        }
  -                     
  +                                             
                } else {
                        
                        //Wake up the thread
  
  
  
  1.24      +11 -63    spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
  retrieving revision 1.23
  retrieving revision 1.24
  diff -u -r1.23 -r1.24
  --- SpyTopicSession.java      2000/05/22 17:32:04     1.23
  +++ SpyTopicSession.java      2000/05/24 19:17:18     1.24
  @@ -24,7 +24,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.23 $
  + *   @version $Revision: 1.24 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -58,10 +58,8 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
  -             SessionQueue sessionQueue=new SessionQueue(transacted,acknowledgeMode);
  -             SpyTopicSubscriber sub=new 
SpyTopicSubscriber(this,sessionQueue,topic,noLocal);
  -             sessionQueue.setConsumer(sub);
  -             addConsumer(topic,sessionQueue);
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal);
  +             SessionQueue sessionQueue=addConsumer(topic,sub);
                
                if (messageSelector!=null) {
                        Selector selector=new Selector(messageSelector);        
  @@ -105,11 +103,10 @@
                //Not implemented yet
        }
        
  -     //overides SpySession
        
  +     // - Package protected ---------------------------------------------
  +     // - Not part of the spec
        
  -     //Not part of the spec
  -     
        //Called by the ConnectionReceiver object : put a new msg in the receiver's 
queue
        public void dispatchMessage(Destination dest,SpyMessage mes) throws 
JMSException
        {
  @@ -118,22 +115,20 @@
                Log.log("Session: 
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
                
                if (mes.isOutdated()) return;
  -
  -             //Get the set of SessionQueue (MessageConsumers) for this Destination  
         
  -             HashSet set=(HashSet)subscribers.get(dest);
  -             if (set==null||set.isEmpty()) return;
   
  -             //Work on the set of consumers for this topic
  +             //Get the SessionQueue for this Destination
  +             SessionQueue sessionQueue=(SessionQueue)destinations.get(dest);
  +             if (sessionQueue==null) return;
                
  -             Iterator i=set.iterator();                              
  +             //Work on the set of SpyTopicSubscriber for this topic          
  +             Iterator i=sessionQueue.subscribers.iterator();                        
 
                while (i.hasNext()) {   
  -                     SessionQueue sub=(SessionQueue)i.next();
  +                     SpyTopicSubscriber sub=(SpyTopicSubscriber)i.next();
                        sub.addMessage(mes);
                }
                
        }
        
  -     // Package protected ---------------------------------------------
            
        //called by a MessageProducer object which needs to publish a message
        void sendMessage(SpyMessage m) throws JMSException
  @@ -176,52 +171,5 @@
   
        }
        
  -     void addConsumer(Destination dest, SessionQueue who) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");
  -                                                                             
  -             Log.log("TopicSession: 
subscribe(dest="+dest.toString()+",QueueSession="+who.toString()+")");
  -             
  -             synchronized (subscribers) {
  -                     HashSet sub=(HashSet)subscribers.get(dest);
  -                     if (sub==null) {
  -                             sub=new HashSet();
  -                             sub.add(who);
  -                             HashMap newSubscribers=(HashMap)subscribers.clone();
  -                             newSubscribers.put(dest,sub);
  -                             subscribers=newSubscribers;
  -                             connection.addSession(dest,this);
  -                     } else {
  -                             HashSet newSub=(HashSet)sub.clone();
  -                             newSub.add(who);
  -                             subscribers.put(dest,newSub);
  -                     }               
  -             }
  -     }
  -
  -     void removeConsumer(Destination dest, SessionQueue who) throws JMSException
  -     {
  -             Log.log("TopicSession: 
removeConsumer(Destination="+dest.toString()+",QueueSession="+who.toString()+")");
  -             
  -             synchronized (subscribers) {
  -                     HashSet sub=(HashSet)subscribers.get(dest);
  -                     if (sub!=null) {
  -                             HashSet newSub=(HashSet)sub.clone();
  -                             newSub.remove(who);
  -                             if (newSub.isEmpty()) {
  -                                     HashMap 
newSubscribers=(HashMap)subscribers.clone();
  -                                     newSubscribers.remove(dest);
  -                                     subscribers=newSubscribers;
  -                                     connection.removeSession(dest,this);
  -                             } else subscribers.put(dest,newSub);
  -                     } else {
  -                             //this should not happen
  -                             HashMap newSubscribers=(HashMap)subscribers.clone();
  -                             newSubscribers.remove(dest);
  -                             subscribers=newSubscribers;
  -                     }
  -             }                       
  -             
  -     }
        
   }
  
  
  
  1.10      +27 -39    spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyTopicSubscriber.java   2000/05/20 02:34:00     1.9
  +++ SpyTopicSubscriber.java   2000/05/24 19:17:18     1.10
  @@ -20,7 +20,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -30,18 +30,15 @@
   
        //The topic I registered
        private Topic topic;
  -     //A link to my session queue (in my session)
  -     private SessionQueue mySessionQueue;
        //Am I in local mode ?
        boolean local;
   
        // Constructor ---------------------------------------------------
           
  -    SpyTopicSubscriber(SpyTopicSession s,SessionQueue sq,Topic t,boolean local) 
  +    SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean local) 
        {
  -             super(s);
  -             topic=t;
  -             mySessionQueue=sq;
  +             super(session);
  +             this.topic=topic;
                this.local=local;
        }
   
  @@ -49,44 +46,43 @@
   
        public Topic getTopic() throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -                             
  +             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                                   
                return topic;
        }
   
       public boolean getNoLocal() throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             
  +             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
                return local;
        }
        
  +     //Overrides MessageConsumer
  +
        public void close() throws JMSException
        {               
                if (closed) return;
                closed=true;
   
  -             session.removeConsumer(topic,mySessionQueue);
  +             session.removeConsumer(topic,this);
                
  -             if (mySessionQueue.waitInReceive&&messageListener==null) {
  +             if (waitInReceive&&messageListener==null) {
                        
                        //A consumer could be waiting in receive()
  -                     synchronized (mySessionQueue.messages) {
  -                             mySessionQueue.messages.notify();
  +                     synchronized (messages) {
  +                             messages.notify();
                        }
  +                     
                }
        }
  -     
  -     //Overrides MessageConsumer
  -     
  +             
       public Message receive() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
  -             synchronized (mySessionQueue.messages) {
  +             synchronized (messages) {
                        
                        //if the client follows the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() from 
another thread.
  +                     //to asynchronously receive a message or receive() in another 
thread.
                        //If a message is already pending for this session, we can 
immediatly deliver it 
                        
                        while (true) {
  @@ -94,16 +90,16 @@
                                if (closed) return null;
                                
                                if (!session.modeStop) {
  -                                     Message 
mes=mySessionQueue.getMessage(selector);
  +                                     Message mes=getMessage();
                                        if (mes!=null) return mes;
                                } else Log.log("the connection is stopped !");
                                
                                try {
  -                                     mySessionQueue.waitInReceive=true;
  -                                     mySessionQueue.messages.wait();
  +                                     waitInReceive=true;
  +                                     messages.wait();
                                } catch (InterruptedException e) {
                                } finally {
  -                                     mySessionQueue.waitInReceive=false;
  +                                     waitInReceive=false;
                                }
                                
                        }
  @@ -118,7 +114,7 @@
                
                long endTime=(new Date()).getTime()+timeOut;
                
  -             synchronized (mySessionQueue.messages) {
  +             synchronized (messages) {
                        
                        //if the client respects the specification [4.4.6], he cannot 
use this session
                        //to asynchronously receive a message or receive() from 
another thread.
  @@ -129,7 +125,7 @@
                                if (closed) return null;
                                
                                if (!session.modeStop) {
  -                                     Message 
mes=mySessionQueue.getMessage(selector);
  +                                     Message mes=getMessage();
                                        if (mes!=null) return mes;
                                } else Log.log("the connection is stopped !");
                                
  @@ -137,11 +133,11 @@
                                if (att<=0) return null;
                                
                                try {                                   
  -                                     mySessionQueue.waitInReceive=true;
  -                                     mySessionQueue.messages.wait(att);
  +                                     waitInReceive=true;
  +                                     messages.wait(att);
                                } catch (InterruptedException e) {
                                } finally {
  -                                     mySessionQueue.waitInReceive=false;
  +                                     waitInReceive=false;
                                }
                                
                        }
  @@ -153,11 +149,11 @@
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
                
  -             synchronized (mySessionQueue.messages) {
  +             synchronized (messages) {
                        
                        while (true) {
                                if (session.modeStop) return null;
  -                             return mySessionQueue.getMessage(selector);
  +                             return getMessage();
                        }
                                                
                }
  @@ -175,12 +171,4 @@
                }
        }
        
  -     // ----- Debug only ----- [not public part of the spec]
  -     
  -     public void setSelector(Selector selector,String messageSelector)
  -     {
  -             this.selector=selector;
  -             this.messageSelector=messageSelector;
  -     }
  -
   }
  
  
  

Reply via email to