User: norbert 
  Date: 00/05/17 16:43:09

  Modified:    src/java/org/spyderMQ JMSServer.java SessionQueue.java
                        SpyMessageConsumer.java SpyQueue.java SpyTopic.java
                        SpyTopicSession.java SpyTopicSubscriber.java
  Log:
  Very basic selector system ( there's no parser yet )
  
  Revision  Changes    Path
  1.27      +3 -3      spyderMQ/src/java/org/spyderMQ/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
  retrieving revision 1.26
  retrieving revision 1.27
  diff -u -r1.26 -r1.27
  --- JMSServer.java    2000/05/16 18:04:58     1.26
  +++ JMSServer.java    2000/05/17 23:43:08     1.27
  @@ -20,7 +20,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.26 $
  + *   @version $Revision: 1.27 $
    */
   public class JMSServer 
                implements Runnable 
  @@ -267,7 +267,7 @@
                return q;
        }
   
  -     //A clonnection is closing
  +     //A connection is closing [error or notification]
        public synchronized void connectionClosing(SpyDistributedConnection dc)
        {
                if (dc==null) return;
  @@ -294,7 +294,7 @@
                                                SpyDistributedConnection 
dc2=(SpyDistributedConnection)i2.next();
                                                if (dc.equals(dc2)) {
                                                        Log.log("FIXME: The 
DistributedConnection is registered !");
  -                                                     //Remove it...
  +                                                     //Remove it !!!!!
                                                }
                                        }
                                }
  
  
  
  1.13      +44 -35    spyderMQ/src/java/org/spyderMQ/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SessionQueue.java 2000/05/15 17:53:58     1.12
  +++ SessionQueue.java 2000/05/17 23:43:08     1.13
  @@ -11,20 +11,21 @@
   import javax.jms.JMSException;
   import javax.jms.Session;
   import java.util.LinkedList;
  +import org.spydermq.selectors.Selector;
   
   /**
    *   This class is a message queue which is stored (hashed by Destination) in the 
SpySession object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class SessionQueue
   {
        // Attributes ----------------------------------------------------
   
        //the MessageConsumer of this queue
  -     MessageConsumer destination;
  +     SpyMessageConsumer destination;
        //List of Pending messages (not yet delivered)
        LinkedList messages;
        //List of messages waiting for acknoledgment
  @@ -49,7 +50,7 @@
   
        // Package protected ---------------------------------------------
            
  -     void setConsumer(MessageConsumer consumer)
  +     void setConsumer(SpyMessageConsumer consumer)
        {
                destination=consumer;
        }
  @@ -93,7 +94,7 @@
                                        if (!waitInReceive) return false;              
                 
                                        messages.notify();
                                } else {
  -                                     SpyMessage mes=getMessage();
  +                                     SpyMessage 
mes=getMessage(destination.selector);
                                        if (mes==null) return false;
                                        listener.onMessage(mes);
                                }
  @@ -107,56 +108,64 @@
                }
        }
        
  -     SpyMessage getMessage()
  +     SpyMessage getMessage(Selector selector)
        {
                synchronized (messages) {
                                
                        while (true) {
   
                                try {
  -                                     if (messages.size()==0) 
  -                                             return null;
  +                                     if (messages.size()==0) return null;
                                
                                        SpyMessage 
mes=(SpyMessage)messages.removeFirst();
                                
  -                                     if (!mes.isOutdated()) {
  +                                     if (mes.isOutdated()) {
  +                                             Log.log("SessionQueue: I dropped a 
message (timeout)");
  +                                             continue;
  +                                     }
  +                                             
  +                                     if (selector!=null) {
  +                                             if (!selector.test(mes)) {
  +                                                     Log.log("SessionQueue: I 
dropped a message (selector)");
  +                                                     continue;
  +                                             } else {
  +                                                     Log.log("SessionQueue: 
selector evaluates TRUE");
  +                                             }
  +                                     }
                                                
  -                                             //the SAME Message object is put in 
different SessionQueues
  -                                             //when we deliver it, we have to 
clone() it to insure independance
  -                                             SpyMessage message=mes.myClone();
  +                                     //the SAME Message object is put in different 
SessionQueues
  +                                     //when we deliver it, we have to clone() it to 
insure independance
  +                                     SpyMessage message=mes.myClone();
                                                                                       
                                                         
  -                                             if (!transacted) {
  -                                                     if 
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
  +                                     if (!transacted) {
  +                                             if 
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
                                                                
  -                                                             synchronized 
(messagesWaitForAck) {
  -                                                                     //Put the 
message in the messagesWaitForAck queue
  -                                                                     
messagesWaitForAck.addLast(message);
  -                                                             }
  -                                                             
  -                                                             
message.setSessionQueue(this);
  -                                                             
  -                                                     } else if 
(acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
  -                                                             //DUPS_OK_ACKNOWLEDGE
  -                                                     } else {
  -                                                             //AUTO_ACKNOWLEDGE 
  -                                                             //we don't need to 
keep this message in a queue
  -                                                     }
  -                                             } else {
  -                                                     
  -                                                     //We are linked to a 
transacted session                                                                     
            
  -                                                     
                                                        synchronized 
(messagesWaitForAck) {
                                                                //Put the message in 
the messagesWaitForAck queue
                                                                
messagesWaitForAck.addLast(message);
                                                        }
  +                                                             
  +                                                     message.setSessionQueue(this);
  +                                                             
  +                                             } else if 
(acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
  +                                                     //DUPS_OK_ACKNOWLEDGE
  +                                             } else {
  +                                                     //AUTO_ACKNOWLEDGE 
  +                                                     //we don't need to keep this 
message in a queue
  +                                             }
  +                                     } else {
                                                        
  +                                             //We are linked to a transacted 
session                                                                                
 
  +                                                     
  +                                             synchronized (messagesWaitForAck) {
  +                                                     //Put the message in the 
messagesWaitForAck queue
  +                                                     
messagesWaitForAck.addLast(message);
                                                }
  -                                             
  -                                             return message;
  +                                                     
                                        }
  -                             
  -                                     Log.log("Outdated message");
  -                                     
  +                                             
  +                                     return message;
  +                                                                             
                                } catch (Exception e) {
                                        Log.error(e);
                                }
  
  
  
  1.9       +8 -4      spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyMessageConsumer.java   2000/05/15 17:53:58     1.8
  +++ SpyMessageConsumer.java   2000/05/17 23:43:08     1.9
  @@ -12,13 +12,14 @@
   import javax.jms.Message;
   import java.util.LinkedList;
   import java.util.Date;
  +import org.spydermq.selectors.Selector;
   
   /**
    *   This class implements javax.jms.MessageConsumer
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class SpyMessageConsumer 
        implements MessageConsumer
  @@ -33,6 +34,8 @@
        protected SessionQueue mySessionQueue;
        //Am I closed ?
        protected boolean closed;
  +     //Do I have a selector
  +     public Selector selector;
        
        // Constructor ---------------------------------------------------
           
  @@ -42,6 +45,7 @@
                mySessionQueue=sq;
                messageListener=null;
                closed=false;
  +             selector=null;
        }
        
        // Public --------------------------------------------------------
  @@ -88,7 +92,7 @@
                                if (closed) return null;
                                
                                if (!session.modeStop) {
  -                                     Message mes=mySessionQueue.getMessage();
  +                                     Message 
mes=mySessionQueue.getMessage(selector);
                                        if (mes!=null) return mes;
                                } else Log.log("the connection is stopped !");
                                
  @@ -123,7 +127,7 @@
                                if (closed) return null;
                                
                                if (!session.modeStop) {
  -                                     Message mes=mySessionQueue.getMessage();
  +                                     Message 
mes=mySessionQueue.getMessage(selector);
                                        if (mes!=null) return mes;
                                } else Log.log("the connection is stopped !");
                                
  @@ -151,7 +155,7 @@
                        
                        while (true) {
                                if (session.modeStop) return null;
  -                             return mySessionQueue.getMessage();
  +                             return mySessionQueue.getMessage(selector);
                        }
                                                
                }
  
  
  
  1.5       +2 -2      spyderMQ/src/java/org/spyderMQ/SpyQueue.java
  
  Index: SpyQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueue.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SpyQueue.java     2000/05/15 02:41:47     1.4
  +++ SpyQueue.java     2000/05/17 23:43:08     1.5
  @@ -15,7 +15,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
   public class SpyQueue
        extends SpyDestination 
  @@ -39,7 +39,7 @@
        
        public String toString()
        {
  -             return this.getClass().getName()+"@"+name;
  +             return "Queue@"+name;
        }
        
        // Object override -----------------------------------------------
  
  
  
  1.9       +2 -2      spyderMQ/src/java/org/spyderMQ/SpyTopic.java
  
  Index: SpyTopic.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopic.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyTopic.java     2000/05/11 02:00:53     1.8
  +++ SpyTopic.java     2000/05/17 23:43:08     1.9
  @@ -15,7 +15,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class SpyTopic 
        extends SpyDestination 
  @@ -39,7 +39,7 @@
        
        public String toString()
        {
  -             return this.getClass().getName()+"@"+name;
  +             return "Topic@"+name;
        }
        
        // Object override -----------------------------------------------
  
  
  
  1.20      +2 -2      spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -r1.19 -r1.20
  --- SpyTopicSession.java      2000/05/17 00:28:05     1.19
  +++ SpyTopicSession.java      2000/05/17 23:43:08     1.20
  @@ -22,7 +22,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.19 $
  + *   @version $Revision: 1.20 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -50,7 +50,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                SessionQueue sessionQueue=new SessionQueue(transacted,acknowledgeMode);
  -             TopicSubscriber sub=new SpyTopicSubscriber(this,sessionQueue,topic);
  +             SpyTopicSubscriber sub=new SpyTopicSubscriber(this,sessionQueue,topic);
                sessionQueue.setConsumer(sub);
                addConsumer(topic,sessionQueue);
                return sub;
  
  
  
  1.7       +10 -1     spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SpyTopicSubscriber.java   2000/05/04 23:39:36     1.6
  +++ SpyTopicSubscriber.java   2000/05/17 23:43:08     1.7
  @@ -10,12 +10,14 @@
   import javax.jms.JMSException;
   import javax.jms.Topic;
   
  +import org.spydermq.selectors.Selector;
  +
   /**
    *   This class implements javax.jms.TopicSubscriber
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.6 $
  + *   @version $Revision: 1.7 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -55,6 +57,13 @@
        {               
                session.removeConsumer(topic,mySessionQueue);
                super.close();
  +     }
  +     
  +     // ----- Debug only ----- [not part of the spec]
  +     
  +     public void setSelector(Selector selector)
  +     {
  +             this.selector=selector;
        }
   
   }
  
  
  

Reply via email to