User: norbert 
  Date: 00/05/18 13:21:05

  Modified:    src/java/org/spyderMQ JMSServer.java JMSServerQueue.java
                        SessionQueue.java SpyMessageConsumer.java
                        SpyQueueSession.java SpySession.java
                        SpyTopicPublisher.java SpyTopicSession.java
                        SpyTopicSubscriber.java
  Log:
  second step for queues
  
  Revision  Changes    Path
  1.29      +8 -25     spyderMQ/src/java/org/spyderMQ/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
  retrieving revision 1.28
  retrieving revision 1.29
  diff -u -r1.28 -r1.29
  --- JMSServer.java    2000/05/18 00:16:09     1.28
  +++ JMSServer.java    2000/05/18 20:20:56     1.29
  @@ -20,7 +20,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.28 $
  + *   @version $Revision: 1.29 $
    */
   public class JMSServer 
                implements Runnable 
  @@ -83,8 +83,9 @@
                        synchronized (taskQueue)
                        {
                                while (queue==null) {                                  
 
  -                                                                             
  -                                     int size=taskQueue.size(); // size() is O(1) 
in LinkedList... 
  +                                     
  +                                     // size() is O(1) in LinkedList... 
  +                                     int size=taskQueue.size(); 
                                        if (size!=0) { 
                                                
queue=(JMSServerQueue)taskQueue.removeFirst();
                                                //One other thread can start working 
on the task queue...
  @@ -99,28 +100,10 @@
                                        }
                                        
                                }
  -                     }
  -                     
  -                     //Clear the message queue for this destination
  -                     SpyMessage[] msgs=queue.startWork();
  -                     
  -                     //Let the thread do its work
  -                     if ((queue.destination instanceof Topic)&&msgs.length>1) {
  -                             //We can send multiple messages
  -                             Log.log(" Send Msgs[1.."+msgs.length+"]");
  -                             queue.sendMultipleMessages(msgs);
  -                     } else {
  -                             //Send each message
  -                             for(int i=0;i<msgs.length;i++) {
  -                                     SpyMessage message=(SpyMessage)msgs[i];
  -                                     Log.log(" Send Msg : "+message.toString());
  -                                     if (!message.isOutdated())
  -                                             queue.sendOneMessage(message);
  -                             }                       
                        }
  -                     
  -                     //Notify that it has finished his work : now, another thread 
can start working on this destination
  -                     queue.endWork();
  +
  +                     //Ask the queue to do its job
  +                     queue.doMyJob();
                        
                }
                        
  @@ -191,7 +174,7 @@
        //A connection has send a new message
       public void newMessage(SpyMessage val,String id) throws JMSException 
        {
  -             Log.log("JMSserver: newMessage(val="+val.toString()+")");
  +             Log.log("JMSserver: 
newMessage(dest="+val.jmsDestination+",val="+val.toString()+")");
                
                JMSServerQueue 
queue=(JMSServerQueue)messageQueue.get(val.jmsDestination);
                if (queue==null) throw new JMSException("This destination does not 
exist !");
  
  
  
  1.26      +58 -7     spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServerQueue.java,v
  retrieving revision 1.25
  retrieving revision 1.26
  diff -u -r1.25 -r1.26
  --- JMSServerQueue.java       2000/05/18 00:16:10     1.25
  +++ JMSServerQueue.java       2000/05/18 20:20:56     1.26
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.25 $
  + *   @version $Revision: 1.26 $
    */
   public class JMSServerQueue
   {
  @@ -39,8 +39,11 @@
        SpyDistributedConnection temporaryDestination;
        //The JMSServer object
        private JMSServer server;
  +     //Am I a queue or a topic  
  +     boolean isTopic;
  +     //List of messages waiting for acknowledgment
  +     private LinkedList messagesWaitingForAck;
        
  -     
        // Constructor ---------------------------------------------------
           
        JMSServerQueue(Destination dest,SpyDistributedConnection temporary,JMSServer 
server)
  @@ -52,6 +55,14 @@
                alreadyInTaskQueue=false;
                temporaryDestination=temporary;
                this.server=server;
  +             
  +             if (dest instanceof SpyTopic) {
  +                     isTopic=true;
  +                     messagesWaitingForAck=null;
  +             } else {
  +                     isTopic=false;
  +                     messagesWaitingForAck=new LinkedList();
  +             }
        }
        
        // Package protected ---------------------------------------------
  @@ -96,8 +107,13 @@
                                messages.add(i,mes);
                        }
                        
  -                     //if a thread is already working on this destination, I don't 
have to myself to the taskqueue
  -                     if (!threadWorking) notifyWorkers();                    
  +                     if (isTopic) {
  +                             //if a thread is already working on this destination, 
I don't have to myself to the taskqueue
  +                             if (!threadWorking) notifyWorkers();                   
                                 
  +                     } else {
  +                             Log.log("Queue: addMessage");
  +                     }
  +
                }
        }
   
  @@ -122,8 +138,12 @@
                threadWorking=false;            
                
                synchronized (messages) {
  -                     //notify another thread if there is work to do !
  -                     if (!messages.isEmpty()) notifyWorkers();
  +                     if (isTopic) {
  +                             //notify another thread if there is work to do !
  +                             if (!messages.isEmpty()) notifyWorkers();
  +                     } else {
  +                             Log.log("Queue: endWork");
  +                     }
                }
        }
   
  @@ -168,7 +188,7 @@
                }
        }
        
  -     //A connection is closing !
  +     //A connection is closing
        void connectionClosing(SpyDistributedConnection dc)
        {
                if (!subscribers.contains(dc)) return;
  @@ -198,6 +218,37 @@
                
                //remove this connection from the list
                i.remove();
  +     }
  +     
  +     void doMyJob()
  +     {                       
  +             if (isTopic) {                  
  +                     
  +                     //Clear the message queue
  +                     SpyMessage[] msgs=startWork();                  
  +                             
  +                     //Let the thread do its work
  +                     if (msgs.length>1) {
  +                             //We can send multiple messages
  +                             Log.log("Send Msgs[1.."+msgs.length+"]");
  +                             sendMultipleMessages(msgs);
  +                     } else {
  +                             //Send each message
  +                             for(int i=0;i<msgs.length;i++) {
  +                                     SpyMessage message=(SpyMessage)msgs[i];
  +                                     Log.log("Send one 
msg("+message.toString()+")");
  +                                     if (!message.isOutdated()) 
sendOneMessage(message);
  +                             }                       
  +                     }
  +                             
  +                     //Notify that it has finished its work : another thread can 
start working on this queue
  +                     endWork();
  +                     
  +             } else {
  +                     
  +                     Log.log("Queue :)");
  +                     
  +             }
        }
                
   }
  
  
  
  1.14      +15 -15    spyderMQ/src/java/org/spyderMQ/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- SessionQueue.java 2000/05/17 23:43:08     1.13
  +++ SessionQueue.java 2000/05/18 20:20:57     1.14
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.13 $
  + *   @version $Revision: 1.14 $
    */
   public class SessionQueue
   {
  @@ -29,7 +29,7 @@
        //List of Pending messages (not yet delivered)
        LinkedList messages;
        //List of messages waiting for acknoledgment
  -     LinkedList messagesWaitForAck;
  +     LinkedList messagesWaitingForAck;
        //Is the consumer sleeping in a receive() ?
        boolean waitInReceive;
        //Is the session transacted ?
  @@ -42,7 +42,7 @@
        SessionQueue(boolean tr,int am)
        {
                messages=new LinkedList();
  -             messagesWaitForAck=new LinkedList();
  +             messagesWaitingForAck=new LinkedList();
                waitInReceive=false;
                transacted=tr;
                acknowledgeMode=am;             
  @@ -140,9 +140,9 @@
                                        if (!transacted) {
                                                if 
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
                                                                
  -                                                     synchronized 
(messagesWaitForAck) {
  +                                                     synchronized 
(messagesWaitingForAck) {
                                                                //Put the message in 
the messagesWaitForAck queue
  -                                                             
messagesWaitForAck.addLast(message);
  +                                                             
messagesWaitingForAck.addLast(message);
                                                        }
                                                                
                                                        message.setSessionQueue(this);
  @@ -157,9 +157,9 @@
                                                        
                                                //We are linked to a transacted 
session                                                                                
 
                                                        
  -                                             synchronized (messagesWaitForAck) {
  +                                             synchronized (messagesWaitingForAck) {
                                                        //Put the message in the 
messagesWaitForAck queue
  -                                                     
messagesWaitForAck.addLast(message);
  +                                                     
messagesWaitingForAck.addLast(message);
                                                }
                                                        
                                        }
  @@ -181,14 +181,14 @@
        {
                Log.log("SessionQueue: acknowledge("+mes.toString()+")");
                
  -             synchronized (messagesWaitForAck) {
  +             synchronized (messagesWaitingForAck) {
                        
  -                     int pos=messagesWaitForAck.indexOf(mes);
  +                     int pos=messagesWaitingForAck.indexOf(mes);
                        
                        if (pos==-1) return;
                        
                        for(int i=0;i<=pos;i++)
  -                             messagesWaitForAck.removeFirst();
  +                             messagesWaitingForAck.removeFirst();
                        
                }                               
                
  @@ -198,12 +198,12 @@
        void recover() throws JMSException
        {
                synchronized (messages) {
  -                     synchronized (messagesWaitForAck) {
  +                     synchronized (messagesWaitingForAck) {
                                
  -                             while (messagesWaitForAck.size()!=0) {
  +                             while (messagesWaitingForAck.size()!=0) {
                                        
                                        //Get the most recent unacknowledged message
  -                                     SpyMessage 
mes=(SpyMessage)messagesWaitForAck.removeLast();
  +                                     SpyMessage 
mes=(SpyMessage)messagesWaitingForAck.removeLast();
                                        
                                        //This message is redelivered                  
                 
                                        mes.setJMSRedelivered(true);
  @@ -220,8 +220,8 @@
        //the session is about to commit, we have to clear our messagesWaitForAck queue
        void commit()
        {
  -             synchronized (messagesWaitForAck) {
  -                     messagesWaitForAck.clear();
  +             synchronized (messagesWaitingForAck) {
  +                     messagesWaitingForAck.clear();
                }
        }       
        
  
  
  
  1.10      +12 -97    spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyMessageConsumer.java   2000/05/17 23:43:08     1.9
  +++ SpyMessageConsumer.java   2000/05/18 20:20:57     1.10
  @@ -15,11 +15,11 @@
   import org.spydermq.selectors.Selector;
   
   /**
  - *   This class implements javax.jms.MessageConsumer
  + *   This class implements javax.jms.MessageConsumer - Going to be deprecated
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyMessageConsumer 
        implements MessageConsumer
  @@ -30,8 +30,6 @@
        protected SpySession session;
        //My message listener (null if none)
        public MessageListener messageListener;
  -     //A link to my session queue (in my session)
  -     protected SessionQueue mySessionQueue;
        //Am I closed ?
        protected boolean closed;
        //Do I have a selector
  @@ -39,10 +37,9 @@
        
        // Constructor ---------------------------------------------------
           
  -     SpyMessageConsumer(SpySession s,SessionQueue sq)
  +     SpyMessageConsumer(SpySession s)
        {
                session=s;
  -             mySessionQueue=sq;
                messageListener=null;
                closed=false;
                selector=null;
  @@ -66,113 +63,31 @@
        }
   
       public void setMessageListener(MessageListener listener) throws JMSException
  -     {               
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             
  -             messageListener=listener;
  -             
  -             //Signal the change to the session thread ( it could sleep, while 
there are messages for him )
  -             synchronized (session.thread) {
  -                     session.thread.notify();
  -             }
  +     {       
  +             //Job is done in the inherited classes
        }
   
       public Message receive() throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             
  -             synchronized (mySessionQueue.messages) {
  -                     
  -                     //if the client follows the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() from 
another thread.
  -                     //If a message is already pending for this session, we can 
immediatly deliver it 
  -                     
  -                     while (true) {
  -                             
  -                             if (closed) return null;
  -                             
  -                             if (!session.modeStop) {
  -                                     Message 
mes=mySessionQueue.getMessage(selector);
  -                                     if (mes!=null) return mes;
  -                             } else Log.log("the connection is stopped !");
  -                             
  -                             try {
  -                                     mySessionQueue.waitInReceive=true;
  -                                     mySessionQueue.messages.wait();
  -                             } catch (InterruptedException e) {
  -                             } finally {
  -                                     mySessionQueue.waitInReceive=false;
  -                             }
  -                             
  -                     }
  -             }
  +             //Job is done in the inherited classes
  +             return null;
        }
   
       public Message receive(long timeOut) throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             
  -             if (timeOut==0) return receive();
  -             
  -             long endTime=(new Date()).getTime()+timeOut;
  -             
  -             synchronized (mySessionQueue.messages) {
  -                     
  -                     //if the client respects the specification [4.4.6], he cannot 
use this session
  -                     //to asynchronously receive a message or receive() from 
another thread.
  -                     //If a message is already pending for this session, we can 
deliver it 
  -                                             
  -                     while (true) {
  -                             
  -                             if (closed) return null;
  -                             
  -                             if (!session.modeStop) {
  -                                     Message 
mes=mySessionQueue.getMessage(selector);
  -                                     if (mes!=null) return mes;
  -                             } else Log.log("the connection is stopped !");
  -                             
  -                             long att=endTime-((new Date()).getTime());
  -                             if (att<=0) return null;
  -                             
  -                             try {                                   
  -                                     mySessionQueue.waitInReceive=true;
  -                                     mySessionQueue.messages.wait(att);
  -                             } catch (InterruptedException e) {
  -                             } finally {
  -                                     mySessionQueue.waitInReceive=false;
  -                             }
  -                             
  -                     }
  -             }
  -             
  +             //Job is done in the inherited classes
  +             return null;
        }
   
       public Message receiveNoWait() throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  -             
  -             synchronized (mySessionQueue.messages) {
  -                     
  -                     while (true) {
  -                             if (session.modeStop) return null;
  -                             return mySessionQueue.getMessage(selector);
  -                     }
  -                                             
  -             }
  +             //Job is done in the inherited classes
  +             return null;
        }
   
       public synchronized void close() throws JMSException
        {
  -             if (closed) return;
  -             closed=true;
  -             
  -             if (mySessionQueue.waitInReceive&&messageListener==null) {
  -                     
  -                     //A consumer could be waiting in receive()
  -                     synchronized (mySessionQueue.messages) {
  -                             mySessionQueue.messages.notify();
  -                     }
  -             }
  +             //Job is done in the inherited classes          
        }
        
   }
  
  
  
  1.5       +11 -6     spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyQueueSession.java      2000/05/15 02:56:26     1.4
  +++ SpyQueueSession.java      2000/05/18 20:20:57     1.5
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -34,7 +34,7 @@
           
        SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
        {
  -             super(myConnection,transacted,acknowledgeMode,stop);
  +             super(myConnection,transacted,acknowledgeMode,stop,false);
        }
   
        // Public --------------------------------------------------------
  @@ -66,8 +66,8 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  -             //Not implemented yet
  -             return null;
  +             SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,modeStop);
  +             return receiver;
        }
   
       public QueueReceiver createReceiver(Queue queue, String messageSelector) throws 
JMSException
  @@ -138,8 +138,8 @@
                        
                }
   
  -             //notify the thread that there is work to do
  -             //we should change this...
  +             //Notify the [sleeping ?] thread that there is work to do
  +             //We should not wait for the lock...
                synchronized (thread)
                {
                        thread.notify();
  @@ -156,6 +156,11 @@
        void removeConsumer(Destination dest, SessionQueue who) throws JMSException
        {
                //Not implemented yet
  +     }
  +     
  +     //One receiver is changing its mode
  +     void notifyStopChange(SpyQueueReceiver receiver,boolean newMode)
  +     {
        }
        
   }
  
  
  
  1.13      +38 -120   spyderMQ/src/java/org/spyderMQ/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SpySession.java   2000/05/15 19:40:17     1.12
  +++ SpySession.java   2000/05/18 20:20:57     1.13
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -56,10 +56,12 @@
        boolean closed;
        //This object is the object used to synchronize the session's thread
        public Integer thread;
  +     //Am I linked to a topic
  +     public boolean isTopic;
   
        // Constructor ---------------------------------------------------
           
  -     SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
  +     SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean 
stop,boolean isTopic)
        {
                connection=conn;
                transacted=trans;
  @@ -70,6 +72,7 @@
                modeStop=stop;
                closed=false;
                thread=new Integer(0);
  +             this.isTopic=isTopic;
                        
                //Start one thread for each session
                Thread oneThread=new Thread(this);
  @@ -146,111 +149,7 @@
                return transacted;
        }
   
  -     //Commit a transacted session
  -    public synchronized void commit() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -             if (!transacted) throw new IllegalStateException("The session is not 
transacted");
  -
  -             Log.log("Session: commit()");
  -
  -             boolean modeSav=modeStop;
  -             modeStop=true;
  -             
  -             //Wait for the thread to sleep
  -             synchronized (thread) {
  -                     
  -                     //Move the outgoing messages from the outgoingQueue to the 
outgoingCommitedQueue
  -                     outgoingCommitedQueue.addAll(outgoingQueue);
  -                     outgoingQueue.clear();
  -                     
  -                     //Notify each SessionQueue that we are going to commit
  -                     Collection values = subscribers.values();
  -                     Iterator i1=values.iterator();
  -                     while (i1.hasNext()) {
  -                             HashSet set=(HashSet)i1.next();
  -                             Iterator i2=set.iterator();
  -                             while (i2.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                     sessionQueue.commit();
  -                             }
  -                     }       
  -                     
  -                     //We have finished our work, we can wake up the thread
  -                     modeStop=modeSav;
  -                     thread.notify();
  -             }
  -             
  -     }
  -
  -     //Rollback a transacted session
  -    public synchronized void rollback() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -             if (!transacted) throw new IllegalStateException("The session is not 
transacted");
  -                                                                      
  -             Log.log("Session: rollback()");
  -
  -             boolean modeSav=modeStop;
  -             modeStop=true;
  -             
  -             //Wait for the thread to sleep
  -             synchronized (thread) {
  -                     
  -                     //Clear the outgoing queue
  -                     outgoingQueue.clear();
  -                     
  -                     //Notify each SessionQueue that we are going to rollback
  -                     Collection values = subscribers.values();
  -                     Iterator i1=values.iterator();
  -                     while (i1.hasNext()) {
  -                             HashSet set=(HashSet)i1.next();
  -                             Iterator i2=set.iterator();
  -                             while (i2.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                     sessionQueue.recover();
  -                             }
  -                     }       
  -                     
  -                     //We have finished our work, we can wake up the thread
  -                     modeStop=modeSav;
  -                     thread.notify();
  -             }
  -     }
  -
  -    public synchronized void recover() throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");
  -             if (transacted) throw new IllegalStateException("The session is 
transacted");
  -                                                                      
  -             Log.log("Session: recover()");
   
  -             boolean modeSav=modeStop;
  -             modeStop=true;
  -             
  -             //Wait for the thread to sleep
  -             synchronized (thread) {
  -                     
  -                     //Notify each SessionQueue that we are going to recover
  -                     Collection values = subscribers.values();
  -                     Iterator i1=values.iterator();
  -                     while (i1.hasNext()) {
  -                             HashSet set=(HashSet)i1.next();
  -                             Iterator i2=set.iterator();
  -                             while (i2.hasNext()) {
  -                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                     sessionQueue.recover();
  -                             }
  -                     }       
  -                     
  -                     //We have finished our work, we can wake up the thread
  -                     modeStop=modeSav;
  -                     thread.notify();
  -             }
  -             
  -             
  -     }
  -
       public MessageListener getMessageListener() throws JMSException
        {               
                if (closed) throw new IllegalStateException("The session is closed");  
         
  @@ -309,21 +208,25 @@
                                        }
                                }
                                
  -                             //if we are not in stopped mode, look at incoming 
queues
  +                             //there is no incoming queue in the SpyQueueSession    
                         
                                
  -                             if (!modeStop) {
  +                             if (isTopic) {
                                        
  -                                     Collection values = subscribers.values();
  -                                     Iterator i1=values.iterator();
  -                                     while (i1.hasNext()) {
  -                                             HashSet set=(HashSet)i1.next();
  -                                             Iterator i2=set.iterator();
  -                                             while (i2.hasNext()) {
  -                                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  -                                                     
doneJob=doneJob||sessionQueue.deliverMessage();
  -                                             }
  -                                     }       
  -                                                                                    
 
  +                                     //if we are not in stopped mode, look at the 
incoming queue
  +                                     if (!modeStop) {
  +                                             
  +                                             Collection values = 
subscribers.values();
  +                                             Iterator i1=values.iterator();
  +                                             while (i1.hasNext()) {
  +                                                     HashSet set=(HashSet)i1.next();
  +                                                     Iterator i2=set.iterator();
  +                                                     while (i2.hasNext()) {
  +                                                             SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  +                                                             
doneJob=doneJob||sessionQueue.deliverMessage();
  +                                                     }
  +                                             }       
  +                                                                                    
         
  +                                     }
                                }
                                        
                                //If there were smthg to do, try again
  @@ -371,9 +274,24 @@
   
        public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
        {
  -             //The job is done in the inherited classes
  +             //The job is done in inherited classes
        }       
        
  +     public void commit() throws JMSException
  +     {
  +             //The job is done in inherited classes
  +     }       
  +
  +     public void rollback() throws JMSException
  +     {
  +             //The job is done in inherited classes
  +     }       
  +
  +     public void recover() throws JMSException
  +     {
  +             //The job is done in inherited classes
  +     }       
  +
        public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
        {       
                Log.log("SpySession: deleteDestination(dest="+dest.toString()+")");
  
  
  
  1.8       +2 -1      spyderMQ/src/java/org/spyderMQ/SpyTopicPublisher.java
  
  Index: SpyTopicPublisher.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicPublisher.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyTopicPublisher.java    2000/05/15 02:08:59     1.7
  +++ SpyTopicPublisher.java    2000/05/18 20:20:57     1.8
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyTopicPublisher 
        extends SpyMessageProducer 
  @@ -94,4 +94,5 @@
                //We must put a 'new message' in the Session's outgoing queue [3.9]    
                         
                mySession.sendMessage(message.myClone());
        }
  +             
   }
  
  
  
  1.21      +110 -2    spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- SpyTopicSession.java      2000/05/17 23:43:08     1.20
  +++ SpyTopicSession.java      2000/05/18 20:20:57     1.21
  @@ -13,6 +13,7 @@
   import javax.jms.JMSException;
   import javax.jms.TopicPublisher;
   import javax.jms.TemporaryTopic;
  +import java.util.Collection;
   import java.util.HashSet;
   import java.util.HashMap;
   import java.util.Iterator;
  @@ -22,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.20 $
  + *   @version $Revision: 1.21 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -33,7 +34,7 @@
           
        SpyTopicSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
        {
  -             super(myConnection,transacted,acknowledgeMode,stop);
  +             super(myConnection,transacted,acknowledgeMode,stop,true);
        }
   
        // Public --------------------------------------------------------
  @@ -96,6 +97,113 @@
       public void unsubscribe(String name) throws JMSException
        {
                //Not implemented yet
  +     }
  +     
  +     //overides SpySession
  +     
  +     //Commit a transacted session
  +    public synchronized void commit() throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The session is closed");  
         
  +             if (!transacted) throw new IllegalStateException("The session is not 
transacted");
  +
  +             Log.log("Session: commit()");
  +
  +             boolean modeSav=modeStop;
  +             modeStop=true;
  +             
  +             //Wait for the thread to sleep
  +             synchronized (thread) {
  +                     
  +                     //Move the outgoing messages from the outgoingQueue to the 
outgoingCommitedQueue
  +                     outgoingCommitedQueue.addAll(outgoingQueue);
  +                     outgoingQueue.clear();
  +                     
  +                     //Notify each SessionQueue that we are going to commit
  +                     Collection values = subscribers.values();
  +                     Iterator i1=values.iterator();
  +                     while (i1.hasNext()) {
  +                             HashSet set=(HashSet)i1.next();
  +                             Iterator i2=set.iterator();
  +                             while (i2.hasNext()) {
  +                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  +                                     sessionQueue.commit();
  +                             }
  +                     }       
  +                     
  +                     //We have finished our work, we can wake up the thread
  +                     modeStop=modeSav;
  +                     thread.notify();
  +             }
  +             
  +     }
  +
  +     //Rollback a transacted session
  +    public synchronized void rollback() throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The session is closed");  
         
  +             if (!transacted) throw new IllegalStateException("The session is not 
transacted");
  +                                                                      
  +             Log.log("Session: rollback()");
  +
  +             boolean modeSav=modeStop;
  +             modeStop=true;
  +             
  +             //Wait for the thread to sleep
  +             synchronized (thread) {
  +                     
  +                     //Clear the outgoing queue
  +                     outgoingQueue.clear();
  +                     
  +                     //Notify each SessionQueue that we are going to rollback
  +                     Collection values = subscribers.values();
  +                     Iterator i1=values.iterator();
  +                     while (i1.hasNext()) {
  +                             HashSet set=(HashSet)i1.next();
  +                             Iterator i2=set.iterator();
  +                             while (i2.hasNext()) {
  +                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  +                                     sessionQueue.recover();
  +                             }
  +                     }       
  +                     
  +                     //We have finished our work, we can wake up the thread
  +                     modeStop=modeSav;
  +                     thread.notify();
  +             }
  +     }
  +
  +    public synchronized void recover() throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The session is closed");
  +             if (transacted) throw new IllegalStateException("The session is 
transacted");
  +                                                                      
  +             Log.log("Session: recover()");
  +
  +             boolean modeSav=modeStop;
  +             modeStop=true;
  +             
  +             //Wait for the thread to sleep
  +             synchronized (thread) {
  +                     
  +                     //Notify each SessionQueue that we are going to recover
  +                     Collection values = subscribers.values();
  +                     Iterator i1=values.iterator();
  +                     while (i1.hasNext()) {
  +                             HashSet set=(HashSet)i1.next();
  +                             Iterator i2=set.iterator();
  +                             while (i2.hasNext()) {
  +                                     SessionQueue 
sessionQueue=(SessionQueue)i2.next();
  +                                     sessionQueue.recover();
  +                             }
  +                     }       
  +                     
  +                     //We have finished our work, we can wake up the thread
  +                     modeStop=modeSav;
  +                     thread.notify();
  +             }
  +             
  +             
        }
        
        //Not part of the spec
  
  
  
  1.8       +117 -3    spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- SpyTopicSubscriber.java   2000/05/17 23:43:08     1.7
  +++ SpyTopicSubscriber.java   2000/05/18 20:20:57     1.8
  @@ -9,6 +9,9 @@
   import javax.jms.TopicSubscriber;
   import javax.jms.JMSException;
   import javax.jms.Topic;
  +import javax.jms.Message;
  +import javax.jms.MessageListener;
  +import java.util.Date;
   
   import org.spydermq.selectors.Selector;
   
  @@ -17,7 +20,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -27,13 +30,16 @@
   
        //The topic I registered
        private Topic topic;
  +     //A link to my session queue (in my session)
  +     private SessionQueue mySessionQueue;
   
        // Constructor ---------------------------------------------------
           
       SpyTopicSubscriber(SpyTopicSession s,SessionQueue sq,Topic t) 
        {
  -             super(s,sq);
  +             super(s);
                topic=t;
  +             mySessionQueue=sq;
        }
   
        // Public --------------------------------------------------------
  @@ -55,8 +61,116 @@
        
        public void close() throws JMSException
        {               
  +             if (closed) return;
  +             closed=true;
  +
                session.removeConsumer(topic,mySessionQueue);
  -             super.close();
  +             
  +             if (mySessionQueue.waitInReceive&&messageListener==null) {
  +                     
  +                     //A consumer could be waiting in receive()
  +                     synchronized (mySessionQueue.messages) {
  +                             mySessionQueue.messages.notify();
  +                     }
  +             }
  +     }
  +     
  +     //Overrides MessageConsumer
  +     
  +    public Message receive() throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  +             
  +             synchronized (mySessionQueue.messages) {
  +                     
  +                     //if the client follows the specification [4.4.6], he cannot 
use this session
  +                     //to asynchronously receive a message or receive() from 
another thread.
  +                     //If a message is already pending for this session, we can 
immediatly deliver it 
  +                     
  +                     while (true) {
  +                             
  +                             if (closed) return null;
  +                             
  +                             if (!session.modeStop) {
  +                                     Message 
mes=mySessionQueue.getMessage(selector);
  +                                     if (mes!=null) return mes;
  +                             } else Log.log("the connection is stopped !");
  +                             
  +                             try {
  +                                     mySessionQueue.waitInReceive=true;
  +                                     mySessionQueue.messages.wait();
  +                             } catch (InterruptedException e) {
  +                             } finally {
  +                                     mySessionQueue.waitInReceive=false;
  +                             }
  +                             
  +                     }
  +             }
  +     }
  +
  +    public Message receive(long timeOut) throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  +             
  +             if (timeOut==0) return receive();
  +             
  +             long endTime=(new Date()).getTime()+timeOut;
  +             
  +             synchronized (mySessionQueue.messages) {
  +                     
  +                     //if the client respects the specification [4.4.6], he cannot 
use this session
  +                     //to asynchronously receive a message or receive() from 
another thread.
  +                     //If a message is already pending for this session, we can 
deliver it 
  +                                             
  +                     while (true) {
  +                             
  +                             if (closed) return null;
  +                             
  +                             if (!session.modeStop) {
  +                                     Message 
mes=mySessionQueue.getMessage(selector);
  +                                     if (mes!=null) return mes;
  +                             } else Log.log("the connection is stopped !");
  +                             
  +                             long att=endTime-((new Date()).getTime());
  +                             if (att<=0) return null;
  +                             
  +                             try {                                   
  +                                     mySessionQueue.waitInReceive=true;
  +                                     mySessionQueue.messages.wait(att);
  +                             } catch (InterruptedException e) {
  +                             } finally {
  +                                     mySessionQueue.waitInReceive=false;
  +                             }
  +                             
  +                     }
  +             }
  +             
  +     }
  +
  +    public Message receiveNoWait() throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  +             
  +             synchronized (mySessionQueue.messages) {
  +                     
  +                     while (true) {
  +                             if (session.modeStop) return null;
  +                             return mySessionQueue.getMessage(selector);
  +                     }
  +                                             
  +             }
  +     }
  +
  +    public void setMessageListener(MessageListener listener) throws JMSException
  +     {       
  +             if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");   
  +             
  +             messageListener=listener;
  +             
  +             //Signal the change to the session thread ( it could sleep, while 
there are messages for him )
  +             synchronized (session.thread) {
  +                     session.thread.notify();
  +             }
        }
        
        // ----- Debug only ----- [not part of the spec]
  
  
  

Reply via email to