User: norbert 
  Date: 00/05/24 18:18:35

  Modified:    src/java/org/spyderMQ JMSServer.java SessionQueue.java
                        SpyConnection.java SpyMessageConsumer.java
                        SpyQueueConnection.java SpyQueueReceiver.java
                        SpyQueueSender.java SpyQueueSession.java
                        SpySession.java
  Added:       src/java/org/spyderMQ ConnectionQueue.java
  Log:
  More for P2P :
  Create a new class ( ConnectionQueue ) which holds the subscribers HashSet
  
  Revision  Changes    Path
  1.32      +9 -9      spyderMQ/src/java/org/spyderMQ/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
  retrieving revision 1.31
  retrieving revision 1.32
  diff -u -r1.31 -r1.32
  --- JMSServer.java    2000/05/22 17:32:04     1.31
  +++ JMSServer.java    2000/05/25 01:18:33     1.32
  @@ -22,7 +22,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.31 $
  + *   @version $Revision: 1.32 $
    */
   public class JMSServer 
                implements Runnable 
  @@ -224,30 +224,30 @@
   
        public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection 
dc)
        {
  -             SpyTemporaryTopic t=new SpyTemporaryTopic("JMS_TT"+(new 
Integer(lastTemporaryTopic++).toString()),dc);
  +             SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new 
Integer(lastTemporaryTopic++).toString()),dc);
   
                synchronized (messageQueue) {
  -                     JMSServerQueue queue=new JMSServerQueue(t,dc,this);
  +                     JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
  -                     newMap.put(t,queue); 
  +                     newMap.put(topic,queue); 
                        messageQueue=newMap;
                }
                
  -             return t;
  +             return topic;
        }
        
        public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection 
dc)
        {
  -             SpyTemporaryQueue q=new SpyTemporaryQueue("JMS_TQ"+(new 
Integer(lastTemporaryQueue++).toString()),dc);
  +             SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new 
Integer(lastTemporaryQueue++).toString()),dc);
   
                synchronized (messageQueue) {
  -                     JMSServerQueue queue=new JMSServerQueue(q,dc,this);
  +                     JMSServerQueue sessionQueue=new 
JMSServerQueue(newQueue,dc,this);
                        HashMap newMap=(HashMap)messageQueue.clone();
  -                     newMap.put(q,queue); 
  +                     newMap.put(newQueue,sessionQueue); 
                        messageQueue=newMap;
                }
                
  -             return q;
  +             return newQueue;
        }
   
        //A connection is closing [error or notification]
  
  
  
  1.16      +20 -13    spyderMQ/src/java/org/spyderMQ/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- SessionQueue.java 2000/05/24 19:17:18     1.15
  +++ SessionQueue.java 2000/05/25 01:18:33     1.16
  @@ -20,7 +20,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.15 $
  + *   @version $Revision: 1.16 $
    */
   public class SessionQueue
   {
  @@ -28,21 +28,21 @@
   
        //List of messages waiting for acknoledgment
        LinkedList messagesWaitingForAck;
  -     //Is the session transacted ?
  -     boolean transacted;
  -     //What is the message acknowledgment mode ?
  -     int acknowledgeMode;
        //the MessageConsumers linked to this queue
        HashSet subscribers;
  +     //Number of listening receivers 
  +     int NumListeningSubscribers;
  +     //My SpySession
  +     SpySession session;
   
        // Constructor ---------------------------------------------------
           
  -     SessionQueue(boolean transacted,int acknowledgeMode)
  +     SessionQueue(SpySession session)
        {
                messagesWaitingForAck=new LinkedList();
                subscribers=new HashSet();
  -             this.transacted=transacted;
  -             this.acknowledgeMode=acknowledgeMode;           
  +             this.session=session;
  +             NumListeningSubscribers=0;
        }
   
        // Package protected ---------------------------------------------
  @@ -144,11 +144,6 @@
                }
        }       
        
  -     void notifyReceiverStopped(boolean newMode)
  -     {
  -             //For Queues. Not yet implemented -
  -     }
  -     
        synchronized void addConsumer(SpyMessageConsumer consumer)
        {
                consumer.setSessionQueue(this);
  @@ -163,6 +158,18 @@
                newSet.remove(consumer);
                subscribers=newSet;
                return subscribers.size()==0;
  +     }
  +
  +     synchronized void changeNumListening(int val)
  +     {
  +             NumListeningSubscribers+=val;
  +             
  +             if (val==-1&&NumListeningSubscribers==0) {
  +                     
((SpyQueueConnection)session.connection).changeNumListening(val);
  +             } else if (val==1&&NumListeningSubscribers==1) {
  +                     
((SpyQueueConnection)session.connection).changeNumListening(val);
  +             }
  +     
        }
        
   }
  
  
  
  1.30      +50 -49    spyderMQ/src/java/org/spyderMQ/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyConnection.java,v
  retrieving revision 1.29
  retrieving revision 1.30
  diff -u -r1.29 -r1.30
  --- SpyConnection.java        2000/05/19 19:28:49     1.29
  +++ SpyConnection.java        2000/05/25 01:18:33     1.30
  @@ -29,7 +29,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.29 $
  + *   @version $Revision: 1.30 $
    */
   public class SpyConnection 
                implements Connection, Serializable
  @@ -43,13 +43,13 @@
        protected String clientID;
        //the distributed object which receives messages from the JMS server
        protected SpyDistributedConnection distributedConnection;
  -     //HashMap of (HashSet of Sessions) by Destination
  -     public HashMap subscribers;
  +     //HashMap of ConnectionQueue by Destination
  +     public HashMap destinations;
        //LinkedList of all created sessions by this connection 
        HashSet createdSessions;
        //Last message ID returned
        private int lastMessageID;
  -     //Am I in stopped mode ?
  +     //Is the connection stopped ?
        protected boolean modeStop;
        //Is the connection closed ?
        boolean closed;
  @@ -64,7 +64,7 @@
        {
                //Set the attributes
                provider = theServer;
  -             subscribers=new HashMap();
  +             destinations=new HashMap();
                createdSessions=new HashSet();
                distributedConnection=null;
                closed=false;
  @@ -183,11 +183,11 @@
                
                try {
                        
  -                     //Remove it from the subscribers list
  -                     synchronized (subscribers) {
  -                             HashMap newMap=(HashMap)subscribers.clone();    
  +                     //Remove it from the destinations list
  +                     synchronized (destinations) {
  +                             HashMap newMap=(HashMap)destinations.clone();   
                                newMap.remove(dest);
  -                             subscribers=newMap;
  +                             destinations=newMap;
                        }
                        
                        //Notify its sessions that this TemporaryDestination is going 
to be deleted()
  @@ -240,33 +240,30 @@
                if (distributedConnection==null) createReceiver();
                                                                         
                Log.log("Connection: addSession(dest="+dest.toString()+")");
  +                             
                
  -             HashSet sub=(HashSet)subscribers.get(dest);
  -             if (sub==null) {
  -                     
  -                     synchronized (subscribers) {
  -                             sub=new HashSet();
  -                             sub.add(who);
  -                             HashMap newSubscribers=(HashMap)subscribers.clone();
  -                             newSubscribers.put(dest,sub);
  -                             subscribers=newSubscribers;
  -                     }
  -                     
  -                     try {
  -                             provider.subscribe(dest,distributedConnection);
  -                     } catch (Exception e) {
  -                             failureHandler(e,"Cannot subscribe to this 
Destination");
  -                     }       
  -                     
  -             } else {
  -                     
  -                     synchronized (subscribers) {
  -                             HashSet newSub=(HashSet)sub.clone();
  -                             newSub.add(who);
  -                             subscribers.put(dest,newSub);
  +             try {
  +
  +                     synchronized (destinations) {
  +                             
  +                             ConnectionQueue 
connectionQueue=(ConnectionQueue)destinations.get(dest);
  +                             
  +                             if (connectionQueue==null) {                    
  +                                     connectionQueue=new ConnectionQueue(dest,this);
  +                                     connectionQueue.addSession(who);
  +                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                     newDestinations.put(dest,connectionQueue);
  +                                     destinations=newDestinations;
  +                                     
provider.subscribe(dest,distributedConnection);                 
  +                             } else {                        
  +                                     connectionQueue.addSession(who);               
         
  +                             }
                        }
  -                     
  -             }
  +
  +             } catch (Exception e) {
  +                     failureHandler(e,"Cannot subscribe to this Destination");
  +             }       
  +             
        }               
        
        //The session does not need to recieve the messages to Destination dest
  @@ -277,28 +274,32 @@
                Log.log("Connection: removeSession(dest="+dest.toString()+")");
                
                try {
  -                     synchronized (subscribers) {
  -                             HashSet sub=(HashSet)subscribers.get(dest);     
  -                             if (sub!=null) {
  -                                     HashSet newSub=(HashSet)sub.clone();
  -                                     newSub.remove(who);
  -                                     if (newSub.isEmpty()) {
  -                                             HashMap 
newSubscribers=(HashMap)subscribers.clone();
  -                                             newSubscribers.remove(dest);
  -                                             subscribers=newSubscribers;
  +                     
  +                     synchronized (destinations) {
  +                             
  +                             ConnectionQueue 
connectionQueue=(ConnectionQueue)destinations.get(dest);
  +                             
  +                             if (connectionQueue!=null) {
  +                                     boolean 
empty=connectionQueue.removeSession(who);
  +                                     if (empty) {
  +                                             HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                             newDestinations.remove(dest);
  +                                             destinations=newDestinations;
                                                
provider.unsubscribe(dest,distributedConnection);
  -                                     } else subscribers.put(dest,newSub);
  +                                     } 
                                } else {
                                        //this should not happen
  -                                     HashMap 
newSubscribers=(HashMap)subscribers.clone();
  -                                     newSubscribers.remove(dest);
  -                                     subscribers=newSubscribers;
  +                                     HashMap 
newDestinations=(HashMap)destinations.clone();
  +                                     newDestinations.remove(dest);
  +                                     destinations=newDestinations;
                                        
provider.unsubscribe(dest,distributedConnection);
                                }
  +                             
                        }
  +                     
                } catch (Exception e) {
                        failureHandler(e,"Cannot unsubscribe to this destination");
  -             }       
  +             }
   
        }
        
  @@ -333,7 +334,7 @@
                        createdSessions.remove(who);
                }
   
  -             //This session should not be in the subscribers object anymore. 
  +             //This session should not be in the "destinations" object anymore. 
                //We could check this, though
        }
        
  
  
  
  1.13      +2 -2      spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
  
  Index: SpyMessageConsumer.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- SpyMessageConsumer.java   2000/05/24 19:17:18     1.12
  +++ SpyMessageConsumer.java   2000/05/25 01:18:33     1.13
  @@ -20,7 +20,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class SpyMessageConsumer 
        implements MessageConsumer
  @@ -160,7 +160,7 @@
                                                                
                                                        
message.setSessionQueue(sessionQueue);
                                                                
  -                                             } else if 
(sessionQueue.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
  +                                             } else if 
(session.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
                                                        //DUPS_OK_ACKNOWLEDGE
                                                } else {
                                                        //AUTO_ACKNOWLEDGE 
  
  
  
  1.6       +7 -3      spyderMQ/src/java/org/spyderMQ/SpyQueueConnection.java
  
  Index: SpyQueueConnection.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueConnection.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyQueueConnection.java   2000/05/16 03:31:00     1.5
  +++ SpyQueueConnection.java   2000/05/25 01:18:33     1.6
  @@ -21,7 +21,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class SpyQueueConnection 
        extends SpyConnection 
  @@ -87,8 +87,7 @@
                        failureHandler(e,"Cannot create a temporary queue !");
                        return null;
                }
  -     }
  -     
  +     }       
   
        //Get a queue
        Queue createQueue(String name) throws JMSException
  @@ -103,4 +102,9 @@
                }
        }
   
  +     synchronized void changeNumListening(int val)
  +     {
  +             Log.log("Connection: changeNumListening("+((val>0)?"+)":"-)"));
  +     }
  +     
   }
  
  
  
  1.4       +96 -25    spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyQueueReceiver.java     2000/05/24 19:17:18     1.3
  +++ SpyQueueReceiver.java     2000/05/25 01:18:33     1.4
  @@ -11,13 +11,14 @@
   import javax.jms.Queue;
   import javax.jms.Message;
   import javax.jms.MessageListener;
  +import java.util.Date;
   
   /**
    *   This class implements javax.jms.QueueReceiver
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyQueueReceiver 
        extends SpyMessageConsumer 
  @@ -28,15 +29,15 @@
        //The queue I registered
        private Queue queue;
        //Mode of this QueueReceiver
  -     boolean modeStop;
  +     boolean listening;
        
        // Constructor ---------------------------------------------------
           
  -    SpyQueueReceiver(SpyQueueSession s,Queue q,boolean stop) 
  +    SpyQueueReceiver(SpyQueueSession session,Queue queue) 
        {
  -             super(s);
  -             queue=q;
  -             modeStop=stop;
  +             super(session);
  +             this.queue=queue;
  +             listening=false;
        }
   
        // Public --------------------------------------------------------
  @@ -48,19 +49,12 @@
                return queue;
        }
   
  -     void setModeStop(boolean mode)
  -     {
  -             if (modeStop==mode) return;
  -             modeStop=mode;
  -             ((SpyQueueSession)session).notifyReceiverStopped(this,mode);
  -     }
  -     
        public void close() throws JMSException
        {       
                if (closed) return;
                closed=true;
   
  -             setModeStop(true);
  +             setListening(false);
        }
   
        //Overrides MessageConsumer
  @@ -68,17 +62,91 @@
       public Message receive() throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");
  +                     
  +             setListening(true);
  +             
  +             synchronized (messages) {
  +                     
  +                     //if the client follows the specification [4.4.6], he cannot 
use this session
  +                     //to asynchronously receive a message or receive() in another 
thread.
  +                     //If a message is already pending for this session, we can 
immediatly deliver it 
                        
  -             //Not implemented yet
  -             return null;
  +                     while (true) {
  +                             
  +                             if (closed) {
  +                                     setListening(false);
  +                                     return null;
  +                             }
  +                             
  +                             if (!session.modeStop) {
  +                                     Message mes=getMessage();
  +                                     if (mes!=null) {
  +                                             setListening(false);
  +                                             return mes;
  +                                     }
  +                             } else Log.log("the connection is stopped !");
  +                             
  +                             try {
  +                                     waitInReceive=true;
  +                                     messages.wait();
  +                             } catch (InterruptedException e) {
  +                             } finally {
  +                                     waitInReceive=false;
  +                             }
  +                             
  +                     }
  +                     
  +             }
  +                             
        }
   
       public Message receive(long timeOut) throws JMSException
        {
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");
  +             
  +             if (timeOut==0) return receive();               
  +             long endTime=(new Date()).getTime()+timeOut;
                
  -             //Not implemented yet
  -             return null;
  +             setListening(true);
  +
  +             synchronized (messages) {
  +                     
  +                     //if the client respects the specification [4.4.6], he cannot 
use this session
  +                     //to asynchronously receive a message or receive() from 
another thread.
  +                     //If a message is already pending for this session, we can 
deliver it 
  +                                             
  +                     while (true) {
  +                             
  +                             if (closed) {
  +                                     setListening(false);
  +                                     return null;
  +                             }
  +                             
  +                             if (!session.modeStop) {
  +                                     Message mes=getMessage();
  +                                     if (mes!=null) {
  +                                             setListening(false);
  +                                             return mes;
  +                                     }
  +                             } else Log.log("the connection is stopped !");
  +                             
  +                             long att=endTime-((new Date()).getTime());
  +                             if (att<=0) {
  +                                     setListening(false);
  +                                     return null;
  +                             }
  +                             
  +                             try {                                   
  +                                     waitInReceive=true;
  +                                     messages.wait(att);
  +                             } catch (InterruptedException e) {
  +                             } finally {
  +                                     waitInReceive=false;
  +                             }
  +                             
  +                     }
  +             }
  +     
        }
   
       public Message receiveNoWait() throws JMSException
  @@ -93,14 +161,17 @@
        {       
                if (closed) throw new IllegalStateException("The MessageConsumer is 
closed");                   
                
  -             if (listener==null) {
  -                     sessionQueue.notifyReceiverStopped(false);
  -             } else {
  -                     sessionQueue.notifyReceiverStopped(true);
  -             }
  -
                messageListener=listener;
  -             
  +             setListening(listener!=null);
  +     }
  +     
  +     //---
  +     
  +     void setListening(boolean newvalue)
  +     {
  +             if (newvalue==listening) return;
  +             listening=newvalue;
  +             ((SpyQueueSession)session).notifyReceiverStopped(this,listening);
        }
   
   }
  
  
  
  1.2       +13 -13    spyderMQ/src/java/org/spyderMQ/SpyQueueSender.java
  
  Index: SpyQueueSender.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSender.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyQueueSender.java       2000/05/15 02:08:58     1.1
  +++ SpyQueueSender.java       2000/05/25 01:18:33     1.2
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyQueueSender 
        extends SpyMessageProducer 
  @@ -27,31 +27,31 @@
        // Attributes ----------------------------------------------------
   
        //The session to which this sender is linked
  -     private SpyQueueSession mySession;
  +     private SpyQueueSession session;
        //The queue of this sender
  -     private Queue myQueue=null;
  +     private Queue queue=null;
        
        // Constructor ---------------------------------------------------
           
  -     SpyQueueSender(SpyQueueSession s,Queue q)
  +     SpyQueueSender(SpyQueueSession session,Queue queue)
        {
  -             mySession=s;
  -             myQueue=q;
  +             this.session=session;
  +             this.queue=queue;
        }
   
        // Public --------------------------------------------------------
   
       public Queue getQueue() throws JMSException
        {
  -             return myQueue;
  +             return queue;
        }
   
        //Send methods
        
       public void send(Message message) throws JMSException
        {
  -             if (myQueue==null) throw new InvalidDestinationException("I do not 
have a default Destination !");
  -             send(myQueue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
  +             if (queue==null) throw new InvalidDestinationException("I do not have 
a default Destination !");
  +             send(queue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
        }
   
       public void send(Queue queue, Message message) throws JMSException
  @@ -61,8 +61,8 @@
   
       public void send(Message message, int deliveryMode, int priority, long 
timeToLive) throws JMSException
        {
  -             if (myQueue==null) throw new InvalidDestinationException("I do not 
have a default Destination !");
  -             send(myQueue,message,deliveryMode,priority,timeToLive);
  +             if (queue==null) throw new InvalidDestinationException("I do not have 
a default Destination !");
  +             send(queue,message,deliveryMode,priority,timeToLive);
        }
   
        public void send(Queue queue, Message mes, int deliveryMode, int priority, 
long timeToLive) throws JMSException
  @@ -82,7 +82,7 @@
                        message.setJMSExpiration(timeToLive+ts.getTime());
                }
                message.setJMSPriority(priority);
  -             message.setJMSMessageID(mySession.getNewMessageID());
  +             message.setJMSMessageID(session.getNewMessageID());
                
                //Set the properties and the message body in ReadOnly mode
                //the client has to call clearProperties() and clearBody() if he wants 
to modify those values
  @@ -92,6 +92,6 @@
                message.setJMSRedelivered(false);
                
                //We must put a 'new message' in the Session's outgoing queue [3.9]    
                         
  -             mySession.sendMessage(message.myClone());
  +             session.sendMessage(message.myClone());
        }
   }
  
  
  
  1.9       +5 -5      spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- SpyQueueSession.java      2000/05/24 19:17:18     1.8
  +++ SpyQueueSession.java      2000/05/25 01:18:34     1.9
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.8 $
  + *   @version $Revision: 1.9 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -66,7 +66,7 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
  -             SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,modeStop);
  +             SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
                SessionQueue sessionQueue=addConsumer(queue,receiver);
                
                return receiver;
  @@ -84,7 +84,6 @@
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  -             //Not implemented yet
                return new SpyQueueSender(this,queue);
        }
       
  @@ -149,9 +148,10 @@
        }       
        
        //One receiver is changing its mode
  -     synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean 
newMode)
  +     synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean mode)
        {
  -             //if (newMode==false) 
  +             Log.log("Session: 
notifyReceiverStopped(receiver="+receiver+",mode="+mode+")");
  +             receiver.sessionQueue.changeNumListening(mode?1:-1);
        }
        
   }
  
  
  
  1.17      +2 -2      spyderMQ/src/java/org/spyderMQ/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- SpySession.java   2000/05/24 19:17:18     1.16
  +++ SpySession.java   2000/05/25 01:18:34     1.17
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.16 $
  + *   @version $Revision: 1.17 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -384,7 +384,7 @@
                synchronized (destinations) {
                        SessionQueue sub=(SessionQueue)destinations.get(dest);
                        if (sub==null) {
  -                             sub=new SessionQueue(transacted,acknowledgeMode);
  +                             sub=new SessionQueue(this);
                                sub.addConsumer(who);
                                HashMap newDestinations=(HashMap)destinations.clone();
                                newDestinations.put(dest,sub);
  
  
  
  1.1                  spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java
  
  Index: ConnectionQueue.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  import javax.jms.Destination;
  import java.util.HashSet;
  import java.util.Iterator;
  
  /**
   *    This class is a useful class for the SpyConnection
   *      
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class ConnectionQueue
  {
        // Attributes ----------------------------------------------------
  
        //My destination
        Destination destination;
        //the SpySessions linked to this queue
        public HashSet subscribers;
        //Number of listening sessions 
        int NumListeningSessions;
        //My SpyConnection
        SpyConnection connection;
  
        // Constructor ---------------------------------------------------
           
        ConnectionQueue(Destination destination,SpyConnection connection)
        {
                subscribers=new HashSet();
                this.connection=connection;
                this.destination=destination;
                NumListeningSessions=0;
        }
  
        // Package protected ---------------------------------------------
  
        synchronized void addSession(SpySession session)
        {
                HashSet newSet=(HashSet)subscribers.clone();
                newSet.add(session);
                subscribers=newSet;
        }
  
        synchronized boolean removeSession(SpySession session)
        {
                HashSet newSet=(HashSet)subscribers.clone();
                newSet.remove(session);
                subscribers=newSet;
                return subscribers.size()==0;
        }
  
  }
  
  
  

Reply via email to