User: norbert 
  Date: 00/06/14 12:16:52

  Modified:    src/java/org/spydermq JMSServer.java SpyQueueSession.java
                        SpySession.java SpyTopicSession.java
                        SpyTopicSubscriber.java
  Added:       src/java/org/spydermq JMSServerMBean.java Mutex.java
  Log:
  Add some JMX instrumentation
  change the jnp server
  add a mutex object for a better synchronization
  
  Revision  Changes    Path
  1.4       +3 -2      spyderMQ/src/java/org/spydermq/JMSServer.java
  
  Index: JMSServer.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- JMSServer.java    2000/06/01 02:47:48     1.3
  +++ JMSServer.java    2000/06/14 19:16:50     1.4
  @@ -22,16 +22,17 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class JMSServer 
  -             implements Runnable 
  +             implements Runnable, JMSServerMBean
   {
        
        // Constants -----------------------------------------------------
       
        //number of threads in the pool (TO DO: this value should be dynamic)
        final int NB_THREADS=1;
  +     public static final String OBJECT_NAME = "JMS:service=JMSServer";
   
        // Attributes ----------------------------------------------------
      
  
  
  
  1.3       +3 -7      spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpyQueueSession.java      2000/06/05 03:19:23     1.2
  +++ SpyQueueSession.java      2000/06/14 19:16:51     1.3
  @@ -23,7 +23,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -137,12 +137,8 @@
                        
                }
   
  -             //Notify the [sleeping ?] thread that there is work to do
  -             //We should not have to wait for the lock...
  -             synchronized (thread)
  -             {
  -                     thread.notify();
  -             }
  +             //Notify the [sleeping ?] thread that there is work to do              
 
  +             mutex.notifyLock();
        }       
        
   }
  
  
  
  1.3       +71 -69    spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- SpySession.java   2000/06/01 01:14:29     1.2
  +++ SpySession.java   2000/06/14 19:16:51     1.3
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *   @version $Revision: 1.3 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -55,7 +55,7 @@
        //Is the session closed ?
        boolean closed;
        //This object is the object used to synchronize the session's thread - Need 
fixed / improvement
  -     public Integer thread;
  +     public Mutex mutex;
        //Is this session in alpha mode ?
        public boolean alphaMode;
   
  @@ -72,7 +72,7 @@
                modeStop=stop;
                messageListener=null;
                closed=false;
  -             thread=new Integer(0);
  +             mutex=new Mutex();
                alphaMode=true;
                
                //Start my thread 
  @@ -169,74 +169,74 @@
        public void run()
        {
                Log.log("Hi ! I'm a session thread :)");
  +                                     
  +             mutex.acquireLock();
                
                while (true) {
   
  -                     synchronized (thread) {
  -                             boolean doneJob=false;                          
  +                     boolean doneJob=false;                          
                                
  -                             if (closed) return;
  +                     if (closed) break;
                                
  -                             //look at outgoing queues
  +                     //look at outgoing queues
                                
  -                             SpyMessage outgoingJob[]=null;
  +                     SpyMessage outgoingJob[]=null;
                                
  -                             if (transacted) {               
  -                                     synchronized (outgoingCommitedQueue) {
  -                                             //The session is transacted, we take 
the outgoing msgs from outgoingCommitedQueue
  -                                             if (outgoingCommitedQueue.size()!=0) {
  -                                                     SpyMessage array[]=new 
SpyMessage[outgoingCommitedQueue.size()];
  -                                                     
outgoingJob=(SpyMessage[])outgoingCommitedQueue.toArray(array);
  -                                                     outgoingCommitedQueue.clear();
  -                                             }                                      
                                         
  -                                     }
  -                             } else {
  -                                     synchronized (outgoingQueue) {
  -                                             //The session is not transacted, we 
take the outgoing msgs from outgoingQueue
  -                                             if (outgoingQueue.size()!=0) {
  -                                                     SpyMessage array[]=new 
SpyMessage[outgoingQueue.size()];
  -                                                     
outgoingJob=(SpyMessage[])outgoingQueue.toArray(array);
  -                                                     outgoingQueue.clear();
  -                                             }
  +                     if (transacted) {               
  +                             synchronized (outgoingCommitedQueue) {
  +                                     //The session is transacted, we take the 
outgoing msgs from outgoingCommitedQueue
  +                                     if (outgoingCommitedQueue.size()!=0) {
  +                                             SpyMessage array[]=new 
SpyMessage[outgoingCommitedQueue.size()];
  +                                             
outgoingJob=(SpyMessage[])outgoingCommitedQueue.toArray(array);
  +                                             outgoingCommitedQueue.clear();
  +                                     }                                              
                                 
  +                             }
  +                     } else {
  +                             synchronized (outgoingQueue) {
  +                                     //The session is not transacted, we take the 
outgoing msgs from outgoingQueue
  +                                     if (outgoingQueue.size()!=0) {
  +                                             SpyMessage array[]=new 
SpyMessage[outgoingQueue.size()];
  +                                             
outgoingJob=(SpyMessage[])outgoingQueue.toArray(array);
  +                                             outgoingQueue.clear();
                                        }
                                }
  +                     }
                                
  -                             if (outgoingJob!=null) {                               
         
  -                                     try {
  -                                             //Check for outdated messages !
  -                                             connection.sendToServer(outgoingJob);
  -                                             doneJob=true;
  -                                     } catch (JMSException e) {
  -                                             Log.log("Cannot send 
"+outgoingJob.toString()+" to the provider...");
  -                                             Log.error(e);
  -                                     }
  +                     if (outgoingJob!=null) {                                       
 
  +                             try {
  +                                     //Check for outdated messages !
  +                                     connection.sendToServer(outgoingJob);
  +                                     doneJob=true;
  +                             } catch (JMSException e) {
  +                                     Log.log("Cannot send 
"+outgoingJob.toString()+" to the provider...");
  +                                     Log.error(e);
                                }
  +                     }
                                
  -                             //if we are not in stopped mode, look at the incoming 
queue                             
  +                     //if we are not in stopped mode, look at the incoming queue    
                         
                                
  -                             if (!modeStop) {
  +                     if (!modeStop) {
                                                
  -                                     Collection values = destinations.values();
  -                                     Iterator i=values.iterator();
  -                                     while (i.hasNext()) {
  -                                             SessionQueue 
sessionQueue=(SessionQueue)i.next();
  -                                             
doneJob=doneJob||sessionQueue.deliverMessage();
  -                                     }
  -                                     
  +                             Collection values = destinations.values();
  +                             Iterator i=values.iterator();
  +                             while (i.hasNext()) {
  +                                     SessionQueue 
sessionQueue=(SessionQueue)i.next();
  +                                     doneJob=doneJob||sessionQueue.deliverMessage();
                                }
                                        
  -                             //If there was smthg to do, try again
  -                             if (doneJob) continue;
  +                     }
                                        
  -                             try {
  -                                     Log.log("SessionThread: I'm going to bed...");
  -                                     thread.wait();
  -                                     Log.log("SessionThread: I wake up");
  -                             } catch (InterruptedException e) {                     
                
  -                             }
  +                     //If there was smthg to do, try again
  +                     if (doneJob) continue;
  +                                     
  +                     Log.log("SessionThread: I'm going to bed...");
  +                     mutex.waitLock();
  +                     Log.log("SessionThread: I wake up");
   
  -                     }
                }
  +                     
  +             mutex.releaseLock();
  +             
        }
   
        public synchronized void close() throws JMSException
  @@ -244,10 +244,9 @@
                if (closed) return;
                closed=true;
   
  -             //if the thread is sleeping, kill it
  -             synchronized (thread) {
  -                     thread.notify();
  -             }
  +             //if the thread is sleeping, kill it            
  +             mutex.notifyLock();
  +             mutex.waitToSleep();
                
                //notify the sleeping synchronous listeners
                
  @@ -281,8 +280,10 @@
                modeStop=true;
                
                //Wait for the thread to sleep
  -             synchronized (thread) {
  +             synchronized (mutex) {
                        
  +                     mutex.waitToSleep();
  +                     
                        //Move the outgoing messages from the outgoingQueue to the 
outgoingCommitedQueue
                        outgoingCommitedQueue.addAll(outgoingQueue);
                        outgoingQueue.clear();
  @@ -297,7 +298,8 @@
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
  -                     thread.notify();
  +                     mutex.notify();
  +                     
                }
                
        }
  @@ -314,7 +316,9 @@
                modeStop=true;
                
                //Wait for the thread to sleep
  -             synchronized (thread) {
  +             synchronized (mutex) {
  +                     
  +                     mutex.waitToSleep();
                        
                        //Clear the outgoing queue
                        outgoingQueue.clear();
  @@ -329,7 +333,7 @@
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
  -                     thread.notify();
  +                     mutex.notify();
                }
        }
   
  @@ -344,8 +348,10 @@
                modeStop=true;
                
                //Wait for the thread to sleep
  -             synchronized (thread) {
  +             synchronized (mutex) {
                        
  +                     mutex.waitToSleep();
  +                     
                        //Notify each SessionQueue that we are going to recover
                        Collection values = destinations.values();
                        Iterator i=values.iterator();
  @@ -356,7 +362,7 @@
                        
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
  -                     thread.notify();
  +                     mutex.notify();
                }
                
                
  @@ -434,7 +440,7 @@
        //The connection has changed its mode (stop() or start())
        //We have to wait until message delivery has stopped or wake up the thread
        void notifyStopMode(boolean newValue)
  -        {
  +    {
                
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
                if (modeStop==newValue) return; 
  @@ -444,16 +450,12 @@
                if (modeStop) {
                        
                        //Wait for the thread to sleep
  -                     synchronized (thread) {
  -                             ;
  -                     }
  +                     mutex.waitToSleep();
                                                
                } else {
                        
                        //Wake up the thread
  -                     synchronized (thread) {
  -                             thread.notify();
  -                     }
  +                     mutex.notifyLock();
                        
                }
                
  
  
  
  1.2       +2 -6      spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyTopicSession.java      2000/05/31 18:06:47     1.1
  +++ SpyTopicSession.java      2000/06/14 19:16:51     1.2
  @@ -24,7 +24,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -163,11 +163,7 @@
                }
   
                //notify the thread that there is work to do
  -             //we should change this...
  -             synchronized (thread)
  -             {
  -                     thread.notify();
  -             }
  +             mutex.notifyLock();
   
        }
        
  
  
  
  1.4       +3 -4      spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java
  
  Index: SpyTopicSubscriber.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSubscriber.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyTopicSubscriber.java   2000/06/09 20:03:58     1.3
  +++ SpyTopicSubscriber.java   2000/06/14 19:16:51     1.4
  @@ -19,7 +19,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyTopicSubscriber 
        extends SpyMessageConsumer 
  @@ -165,9 +165,8 @@
                messageListener=listener;
                
                //Signal the change to the session thread ( it could sleep, while 
there are messages for him )
  -             synchronized (session.thread) {
  -                     session.thread.notify();
  -             }
  +             session.mutex.notify();
  +             
        }
        
   }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/JMSServerMBean.java
  
  Index: JMSServerMBean.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  import javax.jms.JMSException;
  
  /**
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public interface JMSServerMBean
  {
        public SpyTopic newTopic(String name) throws JMSException;
        public SpyQueue newQueue(String name) throws JMSException;
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/Mutex.java
  
  Index: Mutex.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq;
  
  /**
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class Mutex
  {
        private boolean sleeping;
        private boolean locked;
        private boolean work;
        private Object obj;
        
        Mutex()
        {
                sleeping=false;
                locked=false;
                work=false;
                obj=new Object();
        }
        
        public synchronized void acquireLock()
        {
                if (locked) throw new RuntimeException("Already locked !");
                locked=true;
        }
        
        public synchronized void releaseLock()
        {
                if (!locked) throw new RuntimeException("Not locked !");
                locked=false;
                this.notify();
        }
  
        public synchronized void notifyLock()
        {
                if (!locked) throw new RuntimeException("Not locked !");
                                
                work=true;
                if (!sleeping) return;
                
                synchronized (obj) {
                        obj.notify();
                }
                
        }
        
        public synchronized void waitToSleep()
        {
                if (!locked) return;
                if (sleeping) return;
                
                try {
                        this.wait();    
                } catch (InterruptedException e) {
                }               
        }
  
        public void waitLock()
        {
                synchronized (this) 
                {               
                        if (!locked) throw new RuntimeException("Not locked !");
                        if (sleeping) throw new RuntimeException("Already sleeping !");
                
                        if (work) {
                                work=false;
                                return;
                        }
                
                        sleeping=true;
                        this.notify();
                }
                
                synchronized (obj) {
                        
                        try {
                                obj.wait();
                        } catch (InterruptedException e) {
                        }
                        
                }
                
                synchronized (this) 
                {
                        sleeping=false;
                        work=false;
                }
                
        }
  
  }
  
  
  
  

Reply via email to