User: norbert 
  Date: 00/06/04 20:19:24

  Modified:    src/java/org/spydermq ConnectionQueue.java
                        JMSServerQueue.java SessionQueue.java
                        SpyConnection.java SpyQueueReceiver.java
                        SpyQueueSession.java
  Log:
  Add synchronization
  Local optimization
  
  Revision  Changes    Path
  1.3       +20 -16    spyderMQ/src/java/org/spydermq/ConnectionQueue.java
  
  Index: ConnectionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/ConnectionQueue.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- ConnectionQueue.java      2000/05/31 23:22:55     1.2
  +++ ConnectionQueue.java      2000/06/05 03:19:23     1.3
  @@ -16,7 +16,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class ConnectionQueue
   {
  @@ -58,26 +58,30 @@
                return subscribers.size()==0;
        }
   
  -     synchronized void changeNumListening(int val) throws JMSException
  +     void changeNumListening(int val) throws JMSException
        {
  -             NumListeningSessions+=val;
  +             //Before check
  +             
  +             //Synchronized part
  +             synchronized (this) {
  +                     NumListeningSessions+=val;
   
  -             Log.log("ConnectionQueue: changeNumListening(listening 
sessions="+NumListeningSessions+")");
  +                     Log.log("ConnectionQueue: changeNumListening(listening 
sessions="+NumListeningSessions+")");
                
  -             if (connection.modeStop) return;
  +                     if (connection.modeStop) return;
                
  -             try {           
  -                     
  -                     if (val==-1&&NumListeningSessions==0) {
  -                             
connection.provider.connectionListening(false,destination,connection.distributedConnection);
  -                     } else if (val==1&&NumListeningSessions==1) {
  -                             
connection.provider.connectionListening(true,destination,connection.distributedConnection);
  +                     try {           
  +                             
  +                             if (val==-1&&NumListeningSessions==0) {
  +                                     
connection.provider.connectionListening(false,destination,connection.distributedConnection);
  +                             } else if (val==1&&NumListeningSessions==1) {
  +                                     
connection.provider.connectionListening(true,destination,connection.distributedConnection);
  +                             }
  +                             
  +                     } catch (Exception e) {
  +                             connection.failureHandler(e,"Cannot contact the JMS 
server");
                        }
  -                     
  -             } catch (Exception e) {
  -                     connection.failureHandler(e,"Cannot contact the JMS server");
  -             }
  -     
  +             }       
        }
        
        synchronized void start() throws JMSException
  
  
  
  1.8       +86 -74    spyderMQ/src/java/org/spydermq/JMSServerQueue.java
  
  Index: JMSServerQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- JMSServerQueue.java       2000/06/01 02:47:48     1.7
  +++ JMSServerQueue.java       2000/06/05 03:19:23     1.8
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.7 $
  + *   @version $Revision: 1.8 $
    */
   public class JMSServerQueue
   {
  @@ -265,67 +265,72 @@
                        
                } else {
                        
  -                     while (true) {
  +                     synchronized (this) {
  +                             //In the Queue case, we synchronize on [this] to avoid 
changes (listening modifications)
  +                             //while we are dispatching messages
                                
  -                             //At first, find a receiver
  -                             //NL: We could find a better receiver (load balancing 
?)
  -                             
  -                             Log.log("get a receiver");
  -                             
  -                             if (listeners==0) break;
  -                             Iterator i=subscribers.values().iterator();
  -                             SpyDistributedConnection dc=null;
  -                             while (i.hasNext()) {
  -                                     dc=(SpyDistributedConnection)i.next();         
                         
  -                                     if (dc.listeners) break;
  -                             }
  -                             
  -                             if (dc==null||!dc.listeners) {
  -                                     Log.error("FIXME: The listeners count was 
invalid !");
  -                                     break;
  -                             }
  -
  -                             //Get the message ( if there is one message pending )
  -                             SpyMessage mes=startWorkQueue();
  -                             if (mes==null) break;
  -                             if (mes.isOutdated()) continue;
  -                                                             
  -                             //Send the message
  -                             try {
  -                                     dc.cr.receive(destination,mes);
  -                             } catch (NoReceiverException e) {
  +                             while (true) {
  +                                                                     
  +                                     //At first, find a receiver
  +                                     //NL: We could find a better receiver (load 
balancing ?)
  +                                     
  +                                     Log.log("get a receiver");
  +                                     
  +                                     if (listeners==0) break;
  +                                     Iterator i=subscribers.values().iterator();
  +                                     SpyDistributedConnection dc=null;
  +                                     while (i.hasNext()) {
  +                                             dc=(SpyDistributedConnection)i.next(); 
                                 
  +                                             if (dc.listeners) break;
  +                                     }
                                        
  -                                     Log.log(e);
  +                                     if (dc==null||!dc.listeners) {
  +                                             Log.error("FIXME: The listeners count 
was invalid !");
  +                                             break;
  +                                     }
   
  +                                     //Get the message ( if there is one message 
pending )
  +                                     SpyMessage mes=startWorkQueue();
  +                                     if (mes==null) break;
  +                                     if (mes.isOutdated()) continue;
  +                                                                     
  +                                     //Send the message
                                        try {
  -                                             addMessage(mes);
  -                                             connectionListening(false,dc);
  -                                     } catch (Exception e2) {
  -                                             Log.error(e2);
  -                                     }                               
  +                                             dc.cr.receive(destination,mes);
  +                                     } catch (NoReceiverException e) {
  +                                             
  +                                             Log.log(e);
   
  -                             } catch (JMSException e) {
  -                                       throw e;
  -                             } catch (Exception e) {
  -                                     //This is a transport failure. We should 
define our own Transport Failure class
  -                                     //to catch errors more precisely.
  -                                                                               
  -                                     try {
  -                                             addMessage(mes);
  -                                     } catch (Exception e2) {
  -                                             Log.error(e2);
  -                                     }
  -                                     
  -                                     Log.error("Cannot deliver this message to the 
client "+dc);                                     
  -                                     Log.error(e);
  -                                     handleConnectionFailure(dc,null);
  -                             } 
  -                                                             
  -                     }
  -                     
  -                     //Notify that it has finished its work : another thread can 
start working on this queue
  -                     endWork();
  +                                             try {
  +                                                     addMessage(mes);
  +                                                     connectionListening(false,dc);
  +                                             } catch (Exception e2) {
  +                                                     Log.error(e2);
  +                                             }                               
  +
  +                                     } catch (JMSException e) {
  +                                               throw e;
  +                                     } catch (Exception e) {
  +                                             //This is a transport failure. We 
should define our own Transport Failure class
  +                                             //for a better execption catching
  +                                                                                    
   
  +                                             try {
  +                                                     addMessage(mes);
  +                                             } catch (Exception e2) {
  +                                                     Log.error(e2);
  +                                             }
                                                
  +                                             Log.error("Cannot deliver this message 
to the client "+dc);                                     
  +                                             Log.error(e);
  +                                             handleConnectionFailure(dc,null);
  +                                     } 
  +                                                                     
  +                             }
  +                             
  +                             //Notify that it has finished its work : another 
thread can start working on this queue
  +                             endWork();
  +                                                     
  +                     }
                }
        }
        
  @@ -339,30 +344,37 @@
                
        void connectionListening(boolean mode,SpyDistributedConnection dc) throws 
JMSException 
        {
  -             SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
  -             if (distributedConnection==null) throw new JMSException("This 
DistributedConnection is not registered");
  +     
  +             // Before check 
                
  -             if (mode) {                     
  -                     if (!distributedConnection.listeners) {
  -                             distributedConnection.listeners=true;
  -                             listeners++;
  -                     }
  -             } else {
  -                     if (distributedConnection.listeners) {
  -                             distributedConnection.listeners=false;
  -                             listeners--;
  +             // Synchronized code : We want to avoid sending messages while we are 
changing the connection status 
  +             synchronized (this) {
  +                     
  +                     
  +                     SpyDistributedConnection 
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
  +                     if (distributedConnection==null) throw new JMSException("This 
DistributedConnection is not registered");
  +             
  +                     if (mode) {                     
  +                             if (!distributedConnection.listeners) {
  +                                     distributedConnection.listeners=true;
  +                                     listeners++;
  +                             }
  +                     } else {
  +                             if (distributedConnection.listeners) {
  +                                     distributedConnection.listeners=false;
  +                                     listeners--;
  +                             }
                        }
  -             }
   
                
  -             if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
  -                     synchronized (messages) {
  -                             if (!messages.isEmpty()) notifyWorkers();
  +                     if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
  +                             synchronized (messages) {
  +                                     if (!messages.isEmpty()) notifyWorkers();
  +                             }
                        }
  -             }
  -             
  -             Log.log("Listeners for "+destination+" = "+listeners);
                
  +                     Log.log("Listeners for "+destination+" = "+listeners);
  +             }               
        }
        
   }
  
  
  
  1.4       +6 -5      spyderMQ/src/java/org/spydermq/SessionQueue.java
  
  Index: SessionQueue.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SessionQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SessionQueue.java 2000/06/01 20:11:48     1.3
  +++ SessionQueue.java 2000/06/05 03:19:23     1.4
  @@ -21,7 +21,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SessionQueue
   {
  @@ -180,15 +180,15 @@
        public void dispatchMessage(Destination dest, SpyMessage mes) throws 
JMSException
        {       
                Log.log("SessionQueue: 
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
  -             
  +                             
                while (true) {
  -
  +             
  +                     //We should synchronize there !!
                        if (session.closed) throw new NoReceiverException("The session 
is closed");
                        if (session.modeStop) throw new NoReceiverException("The 
session is stopped");
                        if (NumListeningSubscribers==0) throw new 
NoReceiverException("There are no receivers for this destination !");
                        if (mes.isOutdated()) return;
                
  -                     //We should use a linked list of all Receivers (enable 
synchronization)
                        Iterator i=subscribers.iterator();
                        SpyQueueReceiver receiver=null;
                        while (i.hasNext()) {
  @@ -200,8 +200,8 @@
                                throw new NoReceiverException("The listeners count was 
invalid !");
                        }
   
  +                     //Work with this receiver
                        synchronized (receiver.messages) {
  -                                             
                                if (receiver.messageListener==null) {
                                        if (!receiver.waitInReceive) {
                                                Log.notice("The receiver was not 
waiting for a message !"); 
  @@ -216,6 +216,7 @@
                                }
                        }
                        
  +                     //Our work is done here
                        break;
   
                }
  
  
  
  1.4       +26 -8     spyderMQ/src/java/org/spydermq/SpyConnection.java
  
  Index: SpyConnection.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyConnection.java        2000/06/01 00:04:40     1.3
  +++ SpyConnection.java        2000/06/05 03:19:23     1.4
  @@ -29,7 +29,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyConnection 
                implements Connection, Serializable
  @@ -235,14 +235,32 @@
                if (distributedConnection==null) createReceiver();
                
                try {
  -                                                                     
  -                     provider.newMessage(mes,clientID);
  -                     
  -                     /*if (mes.jmsDestination instanceof Topic) {
  -                             //If this message is sent to a topic, we can try to 
deliver it locally
  -                             
distributedConnection.cr.receive(mes.jmsDestination,mes);
  -                     }*/
  +
  +                     for(int i=0;i<mes.length;i++) {
  +                             
  +                             SpyMessage message=mes[i];
  +                             
  +                             if (message.isOutdated()) continue;
  +                             Class messageClass=message.jmsDestination.getClass();
  +                             
  +                             if (messageClass==SpyTemporaryTopic.class) {           
                         
  +                                     if 
(((SpyTemporaryTopic)message.jmsDestination).dc.equals(distributedConnection)) {
  +                                             Log.log("local");
  +                                     }
  +                             } else if (messageClass==SpyTemporaryQueue.class) {
  +                                     if 
(((SpyTemporaryQueue)message.jmsDestination).dc.equals(distributedConnection)) {
  +                                             Log.log("local");
  +                                     }
  +                             } else if (messageClass==SpyTopic.class) {
  +                                     //Alpha mode test
  +                             } else {
  +                                     //Alpha mode test + local delivery
  +                             }
  +                             
  +                     }
                        
  +                     provider.newMessage(mes,clientID);
  +                                             
                } catch (Exception e) {
                        failureHandler(e,"Cannot send a message to the JMS provider");
                }
  
  
  
  1.3       +6 -4      spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
  
  Index: SpyQueueReceiver.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueueReceiver.java     2000/06/01 01:14:29     1.2
  +++ SpyQueueReceiver.java     2000/06/05 03:19:23     1.3
  @@ -18,7 +18,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueueReceiver 
        extends SpyMessageConsumer 
  @@ -51,10 +51,12 @@
   
        public void close() throws JMSException
        {       
  -             if (closed) return;
  -             closed=true;
  +             synchronized (messages) {
  +                     if (closed) return;
  +                     closed=true;
   
  -             setListening(false);
  +                     setListening(false);
  +             }
        }
   
        //Overrides MessageConsumer
  
  
  
  1.2       +2 -2      spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyQueueSession.java      2000/05/31 18:06:45     1.1
  +++ SpyQueueSession.java      2000/06/05 03:19:23     1.2
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -138,7 +138,7 @@
                }
   
                //Notify the [sleeping ?] thread that there is work to do
  -             //We should not wait for the lock...
  +             //We should not have to wait for the lock...
                synchronized (thread)
                {
                        thread.notify();
  
  
  

Reply via email to