User: hiram   
  Date: 00/12/27 09:02:27

  Modified:    src/java/org/spydermq Subscription.java
                        SpyTopicSubscriber.java SpySession.java
                        SpyQueueReceiver.java SpyMessageConsumer.java
  Log:
  Feature Add: Durable Topic Subscriptions now work!
  More work still has to be done with user managment (who
  is allowed to create durable subscriptions).  The DurableSubscriptionExample
  class now works.
  
  Revision  Changes    Path
  1.2       +4 -1      spyderMQ/src/java/org/spydermq/Subscription.java
  
  Index: Subscription.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/Subscription.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Subscription.java 2000/12/23 15:48:16     1.1
  +++ Subscription.java 2000/12/27 17:02:25     1.2
  @@ -16,7 +16,7 @@
    *      
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class Subscription
        implements Serializable
  @@ -27,6 +27,8 @@
        public SpyDestination destination;
        // the selector which will filter out messages
        public String messageSelector;
  +     // Should this message destroy the subscription?
  +     public boolean destroyDurableSubscription;
   
        // this is not null if we want a durable subscription
        public String durableSubscriptionName;
  @@ -61,7 +63,7 @@
                                
                                if( !exclusive )
                                        return false;
  -                             if( !listening || !receiving )
  +                             if( !listening && !receiving )
                                        return false;
                                        
                        }
  @@ -79,4 +81,5 @@
                return true;
                
        }
  +
   }
  
  
  
  1.12      +1 -1      spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpyTopicSubscriber.java   2000/12/26 04:15:51     1.11
  +++ SpyTopicSubscriber.java   2000/12/27 17:02:25     1.12
  @@ -20,7 +20,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -49,7 +49,7 @@
           
        SpyTopicSubscriber(SpyTopicSession session,Topic topic,boolean noLocal, String 
selector, String durableSubscriptionName) 
        {
  -             super(session);
  +             super(session, false);
                this.topic=topic;
   
                subscription.destination = (SpyDestination)topic;
  
  
  
  1.22      +1 -6      spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.21
  retrieving revision 1.22
  diff -u -r1.21 -r1.22
  --- SpySession.java   2000/12/26 20:11:04     1.21
  +++ SpySession.java   2000/12/27 17:02:25     1.22
  @@ -34,7 +34,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.21 $
  + *   @version $Revision: 1.22 $
    */
   abstract public class SpySession 
        implements Runnable, Session, XASession
  @@ -202,7 +202,7 @@
        public void setMessageListener(MessageListener listener) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                          
  -             sessionConsumer = new SpyMessageConsumer(this);
  +             sessionConsumer = new SpyMessageConsumer(this, true);
                sessionConsumer.setMessageListener(listener);
                mutex.notifyLock();
        }
  @@ -424,11 +424,6 @@
   
        void removeConsumer(SpyMessageConsumer who) throws JMSException
        {
  -             // The session consumer will remove itself from the session
  -             // but no action has to be taken.  The ConnectionConsumer will
  -             // unsubscribe itself from the provider.
  -             if( who == sessionConsumer ) 
  -                     return;
                
                synchronized (connection) {                     
                        connection.removeConsumer( who );
  
  
  
  1.12      +1 -15     spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpyQueueReceiver.java     2000/12/26 04:15:50     1.11
  +++ SpyQueueReceiver.java     2000/12/27 17:02:26     1.12
  @@ -18,7 +18,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpyQueueReceiver extends SpyMessageConsumer implements QueueReceiver {
        // Attributes ----------------------------------------------------
  @@ -36,19 +36,10 @@
        }
   
   
  -     public void setMessageListener(MessageListener listener) throws JMSException {
  -             
  -             boolean change = (listener!=null) != subscription.listening;           
 
  -             super.setMessageListener(listener);
  -             if (change && queue != null)
  -                     session.connection.listenerChange(subscription.subscriptionId, 
subscription.listening);
  -                     
  -     }
  -
        // Constructor ---------------------------------------------------
   
        SpyQueueReceiver(SpyQueueSession session, Queue queue, String selector) {
  -             super(session);
  +             super(session, false);
                this.queue = queue;
   
                subscription.destination = (SpyDestination)queue;
  @@ -57,9 +48,4 @@
                subscription.noLocal = false;
        }
   
  -
  -
  -     public Subscription getSubscription() {
  -             return subscription;
  -     }
   }
  
  
  
  1.12      +39 -23    spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyMessageConsumer.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- SpyMessageConsumer.java   2000/12/26 04:15:50     1.11
  +++ SpyMessageConsumer.java   2000/12/27 17:02:26     1.12
  @@ -24,7 +24,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class SpyMessageConsumer implements MessageConsumer, SpyConsumer {
   
  @@ -40,6 +40,8 @@
        //List of Pending messages (not yet delivered)
        LinkedList messages;
   
  +     //Is this a session consumer?
  +     boolean sessionConusmer;
   
        // Public --------------------------------------------------------
   
  @@ -58,14 +60,20 @@
        }
   
        public void setMessageListener(MessageListener listener) throws JMSException {
  +             
                if (closed)
                        throw new IllegalStateException("The MessageConsumer is 
closed");
                if (subscription.receiving)
                        throw new JMSException("This MessageConsumer is waiting in 
receive() !");
   
  +             boolean change = (listener!=null) != subscription.listening;           
 
  +                     
                subscription.listening = listener!=null;
                messageListener = listener;
   
  +             if ( !sessionConusmer && change )  {
  +                     session.connection.listenerChange(subscription.subscriptionId, 
subscription.listening);
  +             }       
        }
   
        public Message receive() throws JMSException {
  @@ -194,7 +202,7 @@
                        if (closed)
                                return;
   
  -                     if (subscription != null)
  +                     if (!sessionConusmer)
                                session.removeConsumer(this);
                                                
                        if ( subscription.receiving && messageListener == null) {
  @@ -269,34 +277,38 @@
                        if (messages.size() == 0)
                                return false;
   
  -                     if (messageListener == null) {
  -                             if (!subscription.receiving) {
  +                     // Should we NACK the messages??
  +                     if( messageListener==null && !subscription.receiving && 
  +                                     (this instanceof SpyQueueReceiver || 
subscription.durableSubscriptionName!=null) ) {
   
  -                                     // If no Listener and No reciver is waiting 
for a message
  -                                     // Then we neg ack the message back to the 
server in the queue case.
  -                                     if (this instanceof SpyQueueReceiver) {
  -
  -                                             SpyMessage mes = getMessage();
  -                                             while (mes == null) {
  +                             SpyMessage mes = getMessage();
  +                             while (mes != null) {
   
  -                                                     Log.log("Got unrequested 
message, sending NACK for: " + mes);
  -                                                     AcknowledgementRequest item = 
new AcknowledgementRequest();
  -                                                     item.destination = 
mes.getJMSDestination();
  -                                                     item.messageID = 
mes.getJMSMessageID();
  -                                                     item.isAck = false;
  +                                     Log.log("Got unrequested message, sending NACK 
for: " + mes);
  +                                     AcknowledgementRequest item = new 
AcknowledgementRequest();
  +                                     item.destination = mes.getJMSDestination();
  +                                     item.messageID = mes.getJMSMessageID();
  +                                     item.isAck = false;
   
  -                                                     session.connection.send(item);
  +                                     session.connection.send(item);
   
  -                                                     mes = getMessage();
  -                                             }
  +                                     mes = getMessage();
  +                             }
  +                             
  +                             return false;
  +                     } else if (closed) { // If closed, dump the message 
   
  -                                     }
  -                                     return false;
  +                             SpyMessage mes = getMessage();
  +                             while (mes != null) {
  +                                     mes = getMessage();
                                }
  +                             
  +                             return false;
  +                     }
  +                                     
  +                     if (messageListener == null) {
                                messages.notify();
  -
                        } else {
  -
                                SpyMessage mes = getMessage();
                                if (mes == null)
                                        return false;
  @@ -307,7 +319,6 @@
                                } else if (session.acknowledgeMode == 
session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == session.DUPS_OK_ACKNOWLEDGE) {
                                        mes.doAcknowledge();
                                }
  -
                        }
                }
   
  @@ -324,10 +335,15 @@
   
        // Constructor ---------------------------------------------------
   
  -     SpyMessageConsumer(SpySession s) {
  +     SpyMessageConsumer(SpySession s, boolean sessionConusmer) {
                session = s;
  +             this.sessionConusmer=sessionConusmer;
                messageListener = null;
                closed = false;
                messages = new LinkedList();
  +     }
  +
  +     public String toString() {
  +             return "SpyMessageConsumer:"+subscription.destination;
        }
   }
  
  
  

Reply via email to