User: hiram   
  Date: 00/12/23 17:55:10

  Modified:    src/java/org/spydermq SpyTopicSubscriber.java
                        SpyTopicSession.java SpySession.java
                        SpyQueueSession.java SpyQueueReceiver.java
                        SpyMessageConsumer.java SpyConnectionConsumer.java
                        AcknowledgementRequest.java
  Log:
  ConnectionConsumer fixes and server synchronization optimizations.
  Spyder should now work with the ASF implementation Peter did.
  
  Revision  Changes    Path
  1.10      +1 -1      spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- SpyTopicSubscriber.java   2000/12/23 15:48:16     1.9
  +++ SpyTopicSubscriber.java   2000/12/24 01:55:08     1.10
  @@ -20,7 +20,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.9 $
  + *   @version $Revision: 1.10 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -49,7 +49,7 @@
           
        SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String 
selector, String durableSubscriptionName) 
        {
  -             super(session, (SpyTopic)topic);
  +             super(session);
                this.topic=topic;
   
                subscription.destination = (SpyDestination)topic;
  
  
  
  1.13      +3 -9      spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SpyTopicSession.java      2000/12/23 15:48:15     1.12
  +++ SpyTopicSession.java      2000/12/24 01:55:08     1.13
  @@ -33,7 +33,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -73,7 +73,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
                SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal, 
messageSelector, null);
  -             addConsumer(topic,sub);
  +             addConsumer(sub);
   
                return sub;
        }
  @@ -83,7 +83,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
                SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,false, null, 
name);
  -             addConsumer(topic,sub);
  +             addConsumer(sub);
   
                return sub;
   
  @@ -94,7 +94,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
                SpyTopicSubscriber sub=new SpyTopicSubscriber(this,topic,noLocal, 
messageSelector, name);
  -             addConsumer(topic,sub);
  +             addConsumer(sub);
   
                return sub;
        }
  @@ -124,11 +124,5 @@
                return this;
        }
   
  -     public void setMessageListener(MessageListener listener) throws JMSException
  -     {
  -             
  -             super.setMessageListener(listener);
  -             sessionConsumer = new SpyTopicSubscriber(this, null, false,null,null);
   
  -     }
   }
  
  
  
  1.18      +24 -32    spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- SpySession.java   2000/12/23 15:48:15     1.17
  +++ SpySession.java   2000/12/24 01:55:08     1.18
  @@ -34,7 +34,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.17 $
  + *   @version $Revision: 1.18 $
    */
   abstract public class SpySession 
        implements Runnable, Session, XASession
  @@ -204,7 +204,8 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                          
                messageListener = listener;
  -
  +             sessionConsumer = new SpyMessageConsumer(this);
  +             
                mutex.notifyLock();
        }
        
  @@ -380,37 +381,6 @@
                return connection.getNewMessageID();
        }
        
  -
  -             
  -     void removeConsumer(Destination dest, SpyMessageConsumer who) throws 
JMSException
  -     {
  -             Log.log("Session: 
removeConsumer(Destination="+dest.toString()+",MessageConsumer="+who.toString()+")");
  -
  -             synchronized (connection) {                     
  -                     connection.removeConsumer( who );
  -             }
  -             
  -             consumers.remove( who );
  -     }
  -     
  -     void addConsumer(Destination dest, SpyMessageConsumer who) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The session is closed");
  -                                                                             
  -             Log.log("Session: 
subscribe(dest="+dest.toString()+",MessageConsumer="+who.toString()+")");
  -
  -             synchronized (consumers) {
  -                     consumers.add( who );                   
  -             }
  -
  -             connection.addConsumer(who);
  -             
  -     }
  -
  -     
  -
  -
  -     
        //called by a MessageProducer object which needs to publish a message
        void sendMessage(SpyMessage m) throws JMSException {
                if (closed)
  @@ -440,5 +410,27 @@
                modeStop=newValue;
                
                mutex.notifyLock();
  +     }
  +
  +     void addConsumer(SpyMessageConsumer who) throws JMSException
  +     {
  +             if (closed) throw new IllegalStateException("The session is closed");
  +                                                                             
  +             synchronized (consumers) {
  +                     consumers.add( who );                   
  +             }
  +
  +             connection.addConsumer(who);
  +             
  +     }
  +
  +     void removeConsumer(SpyMessageConsumer who) throws JMSException
  +     {
  +
  +             synchronized (connection) {                     
  +                     connection.removeConsumer( who );
  +             }
  +             
  +             consumers.remove( who );
        }
   }
  
  
  
  1.11      +2 -8      spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- SpyQueueSession.java      2000/12/23 15:48:15     1.10
  +++ SpyQueueSession.java      2000/12/24 01:55:08     1.11
  @@ -29,7 +29,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.10 $
  + *   @version $Revision: 1.11 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -73,7 +73,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
                SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,null);
  -             addConsumer(queue,receiver);
  +             addConsumer(receiver);
                
                return receiver;
        }
  @@ -83,7 +83,7 @@
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
                SpyQueueReceiver receiver=new 
SpyQueueReceiver(this,queue,messageSelector);
  -             addConsumer(queue,receiver);
  +             addConsumer(receiver);
                
                return receiver;
        }
  @@ -109,11 +109,5 @@
                return this;
        }
   
  -     public void setMessageListener(MessageListener listener) throws JMSException
  -     {
  -             
  -             super.setMessageListener(listener);
  -             sessionConsumer = new SpyQueueReceiver(this, null,null);
   
  -     }
   }
  
  
  
  1.9       +3 -3      spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyQueueReceiver.java     2000/12/23 15:48:14     1.8
  +++ SpyQueueReceiver.java     2000/12/24 01:55:08     1.9
  @@ -18,7 +18,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
        // Attributes ----------------------------------------------------
  @@ -55,10 +55,10 @@
        // Constructor ---------------------------------------------------
   
        SpyQueueReceiver(SpyQueueSession session, Queue queue, String selector) {
  -             super(session, (SpyQueue) queue);
  +             super(session);
                this.queue = queue;
  -             
  -             subscription.durableSubscriptionName = null;
  +
  +             subscription.destination = (SpyDestination)queue;
                subscription.messageSelector = selector;
                subscription.durableSubscriptionName = null;
                subscription.noLocal = false;
  
  
  
  1.9       +13 -23    spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyMessageConsumer.java   2000/12/23 15:48:15     1.8
  +++ SpyMessageConsumer.java   2000/12/24 01:55:08     1.9
  @@ -24,9 +24,9 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
  -abstract public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
  +public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
        //Link to my session
        public SpySession session;
  @@ -35,25 +35,12 @@
        //Am I closed ?
        protected boolean closed;
        // The subscription structure should be fill out by the decendent
  -     Subscription subscription = new Subscription();
  +     public Subscription subscription = new Subscription();
   
        //List of Pending messages (not yet delivered)
        LinkedList messages;
   
  -     //The destination this consumer is getting messages from
  -     SpyDestination destination;
   
  -
  -     // Constructor ---------------------------------------------------
  -
  -     SpyMessageConsumer(SpySession s, SpyDestination dest) {
  -             session = s;
  -             destination = dest;
  -             messageListener = null;
  -             closed = false;
  -             messages = new LinkedList();
  -     }
  -
        // Public --------------------------------------------------------
   
        public String getMessageSelector() throws JMSException {
  @@ -86,8 +73,6 @@
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                if (messageListener != null)
                        throw new JMSException("A message listener is already 
registered");
  -             if (destination == null)
  -                     throw new JMSException("No assigned destination.");
   
                Log.log("Subscription="+subscription);
                subscription.receiving = true;
  @@ -128,8 +113,6 @@
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                if (messageListener != null)
                        throw new JMSException("A message listener is already 
registered");
  -             if (destination == null)
  -                     throw new JMSException("No assigned destination.");
   
                if (timeOut == 0)
                        return receive();
  @@ -183,8 +166,6 @@
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                if (messageListener != null)
                        throw new JMSException("A message listener is already 
registered");
  -             if (destination == null)
  -                     throw new JMSException("No assigned destination.");
   
                subscription.receiving = true;
                try {
  @@ -214,8 +195,8 @@
                        if (closed)
                                return;
   
  -                     if (destination != null)
  -                             session.removeConsumer(destination, this);
  +                     if (subscription != null)
  +                             session.removeConsumer(this);
                                                
                        if ( subscription.receiving && messageListener == null) {
                                //A consumer could be waiting in receive()
  @@ -340,5 +321,14 @@
   
        public Subscription getSubscription() {
                return subscription;
  +     }
  +
  +     // Constructor ---------------------------------------------------
  +
  +     SpyMessageConsumer(SpySession s) {
  +             session = s;
  +             messageListener = null;
  +             closed = false;
  +             messages = new LinkedList();
        }
   }
  
  
  
  1.3       +23 -28    spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java
  
  Index: SpyConnectionConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnectionConsumer.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyConnectionConsumer.java        2000/12/23 15:48:15     1.2
  +++ SpyConnectionConsumer.java        2000/12/24 01:55:09     1.3
  @@ -19,7 +19,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyConnectionConsumer implements javax.jms.ConnectionConsumer, 
SpyConsumer {
   
  @@ -36,7 +36,7 @@
        // Is the ConnectionConsumer closed?
        boolean closed;
        // The subscription info the consumer
  -     Subscription subscription;
  +     Subscription subscription = new Subscription();
   
        /**
         * SpyConnectionConsumer constructor comment.
  @@ -53,17 +53,20 @@
                subscription.messageSelector = messageSelector;
                subscription.durableSubscriptionName = null;
                subscription.noLocal = false;
  +             subscription.listening = true;
  +             subscription.receiving = false;
   
                connection.addConsumer(this);
  -
  +             connection.listenerChange(subscription.subscriptionId,true);
  +             
        }
   
        public void addMessage(SpyMessage mes) throws JMSException {
  +             Log.log(""+this+"->addMessage(mes="+mes+")");
                queue.addLast(mes);
        }
   
   
  -
        /**
         * close method comment.
         */
  @@ -80,41 +83,29 @@
        public javax.jms.ServerSessionPool getServerSessionPool() throws 
javax.jms.JMSException {
                return serverSessionPool;
        }
  -     
  -     public boolean isListening() {
  -             return true;
  -     }
  -     
  -     public boolean isReceiving() {
  -             return false;
  -     }
  -     
  -     public void processMessages() throws JMSException {
   
  -             ServerSession serverSession = serverSessionPool.getServerSession();
  -             SpySession spySession = (SpySession) serverSession.getSession();
  -             if (spySession.sessionConsumer == null)
  -                     throw new JMSException("Session did not have a set 
MessageListner");
  +             
  +     public void processMessages() throws JMSException {
   
  -             int loadCounter = 0;
  +             Log.log(""+this+"->processMessages()");
   
                Iterator iter = queue.iterator();
                while (iter.hasNext()) {
  -
  -                     loadCounter++;
                        SpyMessage message = (SpyMessage) iter.next();
  -                     spySession.sessionConsumer.addMessage(message);
   
  -                     if (loadCounter >= maxMessages) {
  +                     ServerSession serverSession = 
serverSessionPool.getServerSession();
  +                     SpySession spySession = (SpySession) 
serverSession.getSession();
   
  -                             serverSession.start();
  +                     if (spySession.sessionConsumer == null) {
  +                             Log.log(""+this+" Session did not have a set 
MessageListner");
  +                             throw new JMSException("Session did not have a set 
MessageListner");
  +                     }
   
  -                             serverSession = serverSessionPool.getServerSession();
  -                             spySession = (SpySession) serverSession.getSession();
  +                     spySession.sessionConsumer.addMessage(message);
  +                     spySession.sessionConsumer.subscription = subscription;
   
  -                             if (spySession.sessionConsumer == null)
  -                                     throw new JMSException("Session did not have a 
set MessageListner");
  -                     }
  +                     Log.log(""+this+" Starting the ServerSession.");
  +                     serverSession.start();
   
                }
   
  @@ -122,5 +113,9 @@
   
        public Subscription getSubscription() {
                return subscription;
  +     }
  +
  +     public String toString() {
  +             return "SpyConnectionConsumer:"+destination;
        }
   }
  
  
  
  1.2       +4 -0      spyderMQ/src/java/org/spydermq/AcknowledgementRequest.java
  
  Index: AcknowledgementRequest.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/AcknowledgementRequest.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- AcknowledgementRequest.java       2000/12/23 15:48:16     1.1
  +++ AcknowledgementRequest.java       2000/12/24 01:55:09     1.2
  @@ -17,7 +17,7 @@
    *
    * @author Hiram Chirino ([EMAIL PROTECTED])
    *
  - * @version $Revision: 1.1 $
  + * @version $Revision: 1.2 $
    */
   public class AcknowledgementRequest
        implements java.io.Serializable
  @@ -40,5 +40,9 @@
   
        public int hashCode() {
                return messageID.hashCode();
  +     }
  +
  +     public String toString() {
  +             return 
"AcknowledgementRequest:"+(isAck?"ACK":"NACK")+","+destination+","+messageID;
        }
   }
  
  
  

Reply via email to