fhanik      2004/04/07 13:43:55

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster
                        SessionMessage.java
               modules/cluster/src/share/org/apache/catalina/cluster/session
                        DeltaManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        AsyncSocketSender.java ReplicationValve.java
                        SimpleTcpCluster.java
               modules/cluster/src/share/org/apache/catalina/cluster/util
                        SmartQueue.java
  Log:
  Two fixes for async replication
  1. Bug 28161 - The "smart" queue will now use the session message getUniqueId 
instead of sessionId, this makes sure that
     no messages are overridden when the queue fills up
  
  2. The async replication thread will now properly exit when members join or leave 
the cluster
  
  Revision  Changes    Path
  1.7       +18 -0     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java
  
  Index: SessionMessage.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SessionMessage.java       27 Feb 2004 14:58:55 -0000      1.6
  +++ SessionMessage.java       7 Apr 2004 20:43:54 -0000       1.7
  @@ -104,6 +104,7 @@
       private Member mSrc;
       private String mContextName;
       private long serializationTimestamp;
  +    private String uniqueId;
   
   
       /**
  @@ -144,6 +145,17 @@
           mSession = session;
           mSessionID = sessionID;
           mContextName = contextName;
  +        uniqueId = sessionID;
  +    }
  +
  +    public SessionMessage( String contextName,
  +                           int eventtype,
  +                           byte[] session,
  +                           String sessionID,
  +                           String uniqueID)
  +    {
  +        this(contextName,eventtype,session,sessionID);
  +        uniqueId = uniqueID;
       }
   
       /**
  @@ -211,5 +223,11 @@
   
       public String getContextName() {
          return mContextName;
  +    }
  +    public String getUniqueId() {
  +        return uniqueId;
  +    }
  +    public void setUniqueId(String uniqueId) {
  +        this.uniqueId = uniqueId;
       }
   }//SessionMessage
  
  
  
  1.21      +6 -4      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
  
  Index: DeltaManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- DeltaManager.java 7 Apr 2004 19:02:30 -0000       1.20
  +++ DeltaManager.java 7 Apr 2004 20:43:54 -0000       1.21
  @@ -767,13 +767,15 @@
      
                      byte[] data = unloadDeltaRequest(deltaRequest);
                      msg = new SessionMessage(name, SessionMessage.EVT_SESSION_DELTA,
  -                                            data, sessionId);
  +                                            data, sessionId,
  +                                            sessionId+System.currentTimeMillis());
                      session.resetDeltaRequest();
                  } else if ( !session.isPrimarySession() ) {
                      msg = new SessionMessage(getName(),
                                            SessionMessage.EVT_SESSION_ACCESSED,
                                            null,
  -                                         sessionId);
  +                                         sessionId,
  +                                         sessionId+System.currentTimeMillis());
                  }
                  session.setPrimarySession(true);
                  //check to see if we need to send out an access message
  @@ -783,7 +785,7 @@
                          msg = new SessionMessage(getName(),
                                                SessionMessage.EVT_SESSION_ACCESSED,
                                                null,
  -                                             sessionId);
  +                                             sessionId+System.currentTimeMillis());
                      }
                      
                  }
  
  
  
  1.6       +25 -5     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
  
  Index: AsyncSocketSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- AsyncSocketSender.java    27 Feb 2004 14:58:56 -0000      1.5
  +++ AsyncSocketSender.java    7 Apr 2004 20:43:55 -0000       1.6
  @@ -34,12 +34,12 @@
       private SmartQueue queue = new SmartQueue();
       private boolean suspect;
       
  +    private QueueThread queueThread = null;
  +    
       public AsyncSocketSender(InetAddress host, int port)  {
           this.address = host;
           this.port = port;
  -        QueueThread t = new QueueThread(this);
  -        t.setDaemon(true);
  -        t.start();
  +        checkThread();
           log.info("Started async sender thread for TCP replication.");
       }
   
  @@ -54,6 +54,16 @@
       public void connect() throws java.io.IOException  {
           sc = new Socket(getAddress(),getPort());
           isSocketConnected = true;
  +        checkThread();
  +        
  +    }
  +    
  +    protected void checkThread() {
  +        if ( queueThread == null ) {
  +            queueThread = new QueueThread(this);
  +            queueThread.setDaemon(true);
  +            queueThread.start();
  +        }
       }
   
       public void disconnect()  {
  @@ -63,6 +73,11 @@
           }catch ( Exception x)
           {}
           isSocketConnected = false;
  +        if ( queueThread != null ) {
  +            queueThread.stopRunning();
  +            queueThread = null;
  +        }
  +        
       }
   
       public boolean isConnected() {
  @@ -112,15 +127,20 @@
       
       private class QueueThread extends Thread {
           AsyncSocketSender sender;
  +        private boolean keepRunning = true;
   
           public QueueThread(AsyncSocketSender sender) {
               this.sender = sender;
               setName("Cluster-AsyncSocketSender-"+(threadCounter++));
           }
           
  +        public void stopRunning() {
  +            keepRunning = false;
  +        }
  +        
           public void run() {
  -            while (true) {
  -                SmartQueue.SmartEntry entry = sender.queue.remove();
  +            while (keepRunning) {
  +                SmartQueue.SmartEntry entry = sender.queue.remove(5000);
                   if ( entry != null ) {
                       try {
                           byte[] data = (byte[]) entry.getValue();
  
  
  
  1.12      +3 -1      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
  
  Index: ReplicationValve.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- ReplicationValve.java     27 Feb 2004 14:58:56 -0000      1.11
  +++ ReplicationValve.java     7 Apr 2004 20:43:55 -0000       1.12
  @@ -132,7 +132,9 @@
           throws IOException, ServletException
       {
           //this happens before the request
  +        //long _debugstart = System.currentTimeMillis();
           context.invokeNext(request, response);
  +        //System.out.println("[DEBUG] Regular invoke 
took="+(System.currentTimeMillis()-_debugstart)+" ms.");
           //this happens after the request
           try
           {
  
  
  
  1.37      +3 -2      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.36
  retrieving revision 1.37
  diff -u -r1.36 -r1.37
  --- SimpleTcpCluster.java     27 Feb 2004 14:58:56 -0000      1.36
  +++ SimpleTcpCluster.java     7 Apr 2004 20:43:55 -0000       1.37
  @@ -400,7 +400,7 @@
                     }//end if
               }
               else {
  -                clusterSender.sendMessage(msg.getSessionID(),data);
  +                clusterSender.sendMessage(msg.getUniqueId(),data);
               }
           } catch ( Exception x ) {
               log.error("Unable to send message through cluster sender.",x);
  @@ -493,6 +493,7 @@
               if ( myobj != null && myobj instanceof SessionMessage ) {
                   
                   SessionMessage msg = (SessionMessage)myobj;
  +                log.debug("Assuming clocks are synched: Replication 
took="+(System.currentTimeMillis()-msg.getTimestamp())+" ms.");
                   String ctxname = msg.getContextName();
                   //check if the message is a EVT_GET_ALL_SESSIONS,
                   //if so, wait until we are fully started up
  
  
  
  1.4       +9 -2      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java
  
  Index: SmartQueue.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SmartQueue.java   27 Feb 2004 14:58:56 -0000      1.3
  +++ SmartQueue.java   7 Apr 2004 20:43:55 -0000       1.4
  @@ -88,12 +88,19 @@
        * @return
        */
       public SmartEntry remove() {
  -        SmartEntry result = null;        
  +        return remove(0);
  +    }
  +    public SmartEntry remove(long timeout) {
  +        SmartEntry result = null; 
  +        long startEntry = System.currentTimeMillis();
           synchronized (mutex) {
               while ( size() == 0 ) {
                   try {
                       if ( debug != 0 ) 
log.debug("["+Thread.currentThread().getName()+"][SmartQueue] Queue sleeping until 
object added size="+size()+".");
  -                    mutex.wait();
  +                    if ( (timeout != 0) && 
((System.currentTimeMillis()-startEntry)>timeout) ) {
  +                        return null;
  +                    }
  +                    mutex.wait(timeout);
                       if ( debug != 0 ) 
log.debug("["+Thread.currentThread().getName()+"][SmartQueue] Queue woke up or 
interrupted size="+size()+".");
                   }
                   catch(IllegalMonitorStateException ex) {
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to