User: hiram   
  Date: 00/11/13 22:15:54

  Modified:    src/java/org/spydermq SpyQueueSession.java SpySession.java
                        SpyTopicSession.java
  Log:
  We can now disable client side persistence.  All actions
  which require persistence are are synchronized with the
  server for it to do the persistence.
  Only the QueueSessions have is option turned on for now.
  
  Revision  Changes    Path
  1.4       +27 -9     spyderMQ/src/java/org/spydermq/SpyQueueSession.java
  
  Index: SpyQueueSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyQueueSession.java      2000/06/14 19:16:51     1.3
  +++ SpyQueueSession.java      2000/11/14 06:15:53     1.4
  @@ -18,12 +18,12 @@
   import java.util.HashMap;
   import java.util.Iterator;
   
  -/**
  +import javax.jms.DeliveryMode;/**
    *   This class implements javax.jms.QueueSession
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class SpyQueueSession 
        extends SpySession 
  @@ -35,11 +35,12 @@
        SpyQueueSession(SpyConnection myConnection, boolean transacted, int 
acknowledgeMode, boolean stop)
        {
                super(myConnection,transacted,acknowledgeMode,stop);
  +             clientPersistence = false;
        }
   
        // Public --------------------------------------------------------
   
  -    public QueueBrowser createBrowser(Queue queue) throws JMSException
  +     public QueueBrowser createBrowser(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  @@ -62,7 +63,7 @@
                return ((SpyQueueConnection)connection).createQueue(queueName);
        }
   
  -    public QueueReceiver createReceiver(Queue queue) throws JMSException
  +     public QueueReceiver createReceiver(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
  @@ -72,7 +73,7 @@
                return receiver;
        }
   
  -    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws 
JMSException
  +     public QueueReceiver createReceiver(Queue queue, String messageSelector) 
throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  @@ -80,13 +81,13 @@
                return createReceiver(queue);
        }
   
  -    public QueueSender createSender(Queue queue) throws JMSException
  +     public QueueSender createSender(Queue queue) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return new SpyQueueSender(this,queue);
        }
  -    
  +     
        public TemporaryQueue createTemporaryQueue() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                         
  @@ -108,8 +109,25 @@
        //called by a MessageProducer object which needs to send a message
        void sendMessage(SpyMessage m) throws JMSException
        {
  -             if (closed) throw new IllegalStateException("The session is closed");  
         
  -                                                                             
  +             if (closed) throw new IllegalStateException("The session is closed");
  +
  +             // If client is not doing persistence then we have to make sure the 
server
  +             // gets the persistent message before we return. (This is done in the 
commit for
  +             // transacted sessions.)
  +             if( !clientPersistence && transacted && 
m.getJMSDeliveryMode()==DeliveryMode.PERSISTENT) {
  +                     //Wait for the sending thread to sleep
  +                     synchronized (mutex) {
  +                             mutex.waitToSleep();
  +
  +                             SpyMessage job[] = { m };
  +                             connection.sendToServer( job );
  +                             
  +                             //We have finished our work, we can wake up the thread
  +                             mutex.notifyLock();
  +                     }
  +                     return;
  +             }
  +                     
                //Synchronize with the outgoingQueue
                synchronized (outgoingQueue) 
                {
  
  
  
  1.11      +29 -8     spyderMQ/src/java/org/spydermq/SpySession.java
  
  Index: SpySession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- SpySession.java   2000/11/12 00:30:15     1.10
  +++ SpySession.java   2000/11/14 06:15:53     1.11
  @@ -29,7 +29,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.10 $
  + *   @version $Revision: 1.11 $
    */
   public class SpySession 
        implements Runnable, Session
  @@ -59,7 +59,9 @@
        public Mutex mutex;
        //Is this session in alpha mode ?
        public boolean alphaMode;
  -
  +     // Should we do client side persistence?
  +     public boolean clientPersistence = true;
  +     
        // Constructor ---------------------------------------------------             
    
   
        SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
  @@ -261,7 +263,7 @@
                // allow other threads to process before closing this session
                // Patch submitted by John Ellis (10/29/00)
                Thread.yield();
  -                
  +                             
                if (closed) return;
                closed=true;
   
  @@ -300,9 +302,28 @@
                modeStop=true;
                
                //Wait for the thread to sleep
  -             synchronized (mutex) {
  -                     
  +             synchronized (mutex) {                  
                        mutex.waitToSleep();
  +
  +                     // If we are not doing client side persistence, then we have 
to send the
  +                     // persistent messages to server and confirm that they have 
been received before we return.
  +                     if( !clientPersistence ) {
  +
  +                             LinkedList persistentMessages = new LinkedList();
  +                             java.util.ListIterator iter = 
outgoingQueue.listIterator();
  +                             while( iter.hasNext() ) {
  +                                     SpyMessage sm = (SpyMessage)iter.next();
  +                                     if( sm.getJMSDeliveryMode() == 
javax.jms.DeliveryMode.PERSISTENT ) {
  +                                             persistentMessages.addLast(sm);
  +                                             iter.remove();
  +                                     }
  +                             }
  +                             if (persistentMessages.size()!=0) {
  +                                     SpyMessage job[]=new 
SpyMessage[persistentMessages.size()];
  +                                     
job=(SpyMessage[])persistentMessages.toArray(job);
  +                                     connection.sendToServer(job);
  +                             }
  +                     }
                        
                        //Move the outgoing messages from the outgoingQueue to the 
outgoingCommitedQueue
                        outgoingCommitedQueue.addAll(outgoingQueue);
  @@ -316,6 +337,7 @@
                                sessionQueue.commit();
                        }       
                        
  +                             
                        //We have finished our work, we can wake up the thread
                        modeStop=modeSav;
                        mutex.notifyLock();
  @@ -460,7 +482,7 @@
        //The connection has changed its mode (stop() or start())
        //We have to wait until message delivery has stopped or wake up the thread
        void notifyStopMode(boolean newValue)
  -    {
  +     {
                
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                  
                if (modeStop==newValue) return; 
  @@ -480,6 +502,5 @@
                }
                
        }
  -     
  -     
  +             
   }
  
  
  
  1.6       +27 -10    spyderMQ/src/java/org/spydermq/SpyTopicSession.java
  
  Index: SpyTopicSession.java
  ===================================================================
  RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyTopicSession.java      2000/07/07 22:37:21     1.5
  +++ SpyTopicSession.java      2000/11/14 06:15:54     1.6
  @@ -28,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.5 $
  + *   @version $Revision: 1.6 $
    */
   public class SpyTopicSession 
        extends SpySession 
  @@ -44,21 +44,21 @@
   
        // Public --------------------------------------------------------
   
  -    public Topic createTopic(String topicName) throws JMSException
  +     public Topic createTopic(String topicName) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
   
                return ((SpyTopicConnection)connection).createTopic(topicName);
        }
   
  -    public TopicSubscriber createSubscriber(Topic topic) throws JMSException
  +     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return createSubscriber(topic,null,false);
        }
   
  -    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, 
boolean noLocal) throws JMSException
  +     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, 
boolean noLocal) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                
  @@ -73,7 +73,7 @@
                return sub;
        }
   
  -    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws 
JMSException
  +     public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  @@ -81,7 +81,7 @@
                return createSubscriber(topic);
        }
   
  -    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String 
messageSelector, boolean noLocal) throws JMSException
  +     public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector, boolean noLocal) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
  @@ -89,20 +89,20 @@
                return createSubscriber(topic);
        }
   
  -    public TopicPublisher createPublisher(Topic topic) throws JMSException
  +     public TopicPublisher createPublisher(Topic topic) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
                                                                                
                return new SpyTopicPublisher(this,topic);
        }
  -    
  +     
        public TemporaryTopic createTemporaryTopic() throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
                                                                         
                return ((SpyTopicConnection)connection).getTemporaryTopic();
        }
   
  -    public void unsubscribe(String name) throws JMSException
  +     public void unsubscribe(String name) throws JMSException
        {
                //Not yet implemented
        }
  @@ -138,6 +138,23 @@
        void sendMessage(SpyMessage m) throws JMSException
        {
                if (closed) throw new IllegalStateException("The session is closed");  
         
  +
  +             // If client is not doing persistence then we have to make sure the 
server
  +             // gets the persistent message before we return. (This is done in the 
commit for
  +             // transacted sessions.)
  +             if( !clientPersistence && transacted && 
m.getJMSDeliveryMode()==javax.jms.DeliveryMode.PERSISTENT) {
  +                     //Wait for the sending thread to sleep
  +                     synchronized (mutex) {
  +                             mutex.waitToSleep();
  +
  +                             SpyMessage job[] = { m };
  +                             connection.sendToServer( job );
  +                             
  +                             //We have finished our work, we can wake up the thread
  +                             mutex.notifyLock();
  +                     }
  +                     return;
  +             }
                                                                                
                //Synchronize with the outgoingQueue
                synchronized (outgoingQueue) 
  @@ -172,7 +189,7 @@
                //Handle persistence
                //First shot : use a fs based persistence system
                try {
  -                     if (m.persistent) {
  +                     if (m.persistent && clientPersistence) {
                                //Log.log("ADD file "+m.getJMSMessageID());
                                if (m.removed==false) {
                                        ObjectOutputStream output=new 
ObjectOutputStream(new FileOutputStream(m.getJMSMessageID()));
  
  
  

Reply via email to