fhanik 2004/04/07 13:43:55 Modified: modules/cluster/src/share/org/apache/catalina/cluster SessionMessage.java modules/cluster/src/share/org/apache/catalina/cluster/session DeltaManager.java modules/cluster/src/share/org/apache/catalina/cluster/tcp AsyncSocketSender.java ReplicationValve.java SimpleTcpCluster.java modules/cluster/src/share/org/apache/catalina/cluster/util SmartQueue.java Log: Two fixes for async replication 1. Bug 28161 - The "smart" queue will now use the session message getUniqueId instead of sessionId, this makes sure that no messages are overridden when the queue fills up 2. The async replication thread will now properly exit when members join or leave the cluster Revision Changes Path 1.7 +18 -0 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java Index: SessionMessage.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- SessionMessage.java 27 Feb 2004 14:58:55 -0000 1.6 +++ SessionMessage.java 7 Apr 2004 20:43:54 -0000 1.7 @@ -104,6 +104,7 @@ private Member mSrc; private String mContextName; private long serializationTimestamp; + private String uniqueId; /** @@ -144,6 +145,17 @@ mSession = session; mSessionID = sessionID; mContextName = contextName; + uniqueId = sessionID; + } + + public SessionMessage( String contextName, + int eventtype, + byte[] session, + String sessionID, + String uniqueID) + { + this(contextName,eventtype,session,sessionID); + uniqueId = uniqueID; } /** @@ -211,5 +223,11 @@ public String getContextName() { return mContextName; + } + public String getUniqueId() { + return uniqueId; + } + public void setUniqueId(String uniqueId) { + this.uniqueId = uniqueId; } }//SessionMessage 1.21 +6 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Index: DeltaManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v retrieving revision 1.20 retrieving revision 1.21 diff -u -r1.20 -r1.21 --- DeltaManager.java 7 Apr 2004 19:02:30 -0000 1.20 +++ DeltaManager.java 7 Apr 2004 20:43:54 -0000 1.21 @@ -767,13 +767,15 @@ byte[] data = unloadDeltaRequest(deltaRequest); msg = new SessionMessage(name, SessionMessage.EVT_SESSION_DELTA, - data, sessionId); + data, sessionId, + sessionId+System.currentTimeMillis()); session.resetDeltaRequest(); } else if ( !session.isPrimarySession() ) { msg = new SessionMessage(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, - sessionId); + sessionId, + sessionId+System.currentTimeMillis()); } session.setPrimarySession(true); //check to see if we need to send out an access message @@ -783,7 +785,7 @@ msg = new SessionMessage(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, - sessionId); + sessionId+System.currentTimeMillis()); } } 1.6 +25 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java Index: AsyncSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- AsyncSocketSender.java 27 Feb 2004 14:58:56 -0000 1.5 +++ AsyncSocketSender.java 7 Apr 2004 20:43:55 -0000 1.6 @@ -34,12 +34,12 @@ private SmartQueue queue = new SmartQueue(); private boolean suspect; + private QueueThread queueThread = null; + public AsyncSocketSender(InetAddress host, int port) { this.address = host; this.port = port; - QueueThread t = new QueueThread(this); - t.setDaemon(true); - t.start(); + checkThread(); log.info("Started async sender thread for TCP replication."); } @@ -54,6 +54,16 @@ public void connect() throws java.io.IOException { sc = new Socket(getAddress(),getPort()); isSocketConnected = true; + checkThread(); + + } + + protected void checkThread() { + if ( queueThread == null ) { + queueThread = new QueueThread(this); + queueThread.setDaemon(true); + queueThread.start(); + } } public void disconnect() { @@ -63,6 +73,11 @@ }catch ( Exception x) {} isSocketConnected = false; + if ( queueThread != null ) { + queueThread.stopRunning(); + queueThread = null; + } + } public boolean isConnected() { @@ -112,15 +127,20 @@ private class QueueThread extends Thread { AsyncSocketSender sender; + private boolean keepRunning = true; public QueueThread(AsyncSocketSender sender) { this.sender = sender; setName("Cluster-AsyncSocketSender-"+(threadCounter++)); } + public void stopRunning() { + keepRunning = false; + } + public void run() { - while (true) { - SmartQueue.SmartEntry entry = sender.queue.remove(); + while (keepRunning) { + SmartQueue.SmartEntry entry = sender.queue.remove(5000); if ( entry != null ) { try { byte[] data = (byte[]) entry.getValue(); 1.12 +3 -1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java Index: ReplicationValve.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java,v retrieving revision 1.11 retrieving revision 1.12 diff -u -r1.11 -r1.12 --- ReplicationValve.java 27 Feb 2004 14:58:56 -0000 1.11 +++ ReplicationValve.java 7 Apr 2004 20:43:55 -0000 1.12 @@ -132,7 +132,9 @@ throws IOException, ServletException { //this happens before the request + //long _debugstart = System.currentTimeMillis(); context.invokeNext(request, response); + //System.out.println("[DEBUG] Regular invoke took="+(System.currentTimeMillis()-_debugstart)+" ms."); //this happens after the request try { 1.37 +3 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.36 retrieving revision 1.37 diff -u -r1.36 -r1.37 --- SimpleTcpCluster.java 27 Feb 2004 14:58:56 -0000 1.36 +++ SimpleTcpCluster.java 7 Apr 2004 20:43:55 -0000 1.37 @@ -400,7 +400,7 @@ }//end if } else { - clusterSender.sendMessage(msg.getSessionID(),data); + clusterSender.sendMessage(msg.getUniqueId(),data); } } catch ( Exception x ) { log.error("Unable to send message through cluster sender.",x); @@ -493,6 +493,7 @@ if ( myobj != null && myobj instanceof SessionMessage ) { SessionMessage msg = (SessionMessage)myobj; + log.debug("Assuming clocks are synched: Replication took="+(System.currentTimeMillis()-msg.getTimestamp())+" ms."); String ctxname = msg.getContextName(); //check if the message is a EVT_GET_ALL_SESSIONS, //if so, wait until we are fully started up 1.4 +9 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java Index: SmartQueue.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/util/SmartQueue.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- SmartQueue.java 27 Feb 2004 14:58:56 -0000 1.3 +++ SmartQueue.java 7 Apr 2004 20:43:55 -0000 1.4 @@ -88,12 +88,19 @@ * @return */ public SmartEntry remove() { - SmartEntry result = null; + return remove(0); + } + public SmartEntry remove(long timeout) { + SmartEntry result = null; + long startEntry = System.currentTimeMillis(); synchronized (mutex) { while ( size() == 0 ) { try { if ( debug != 0 ) log.debug("["+Thread.currentThread().getName()+"][SmartQueue] Queue sleeping until object added size="+size()+"."); - mutex.wait(); + if ( (timeout != 0) && ((System.currentTimeMillis()-startEntry)>timeout) ) { + return null; + } + mutex.wait(timeout); if ( debug != 0 ) log.debug("["+Thread.currentThread().getName()+"][SmartQueue] Queue woke up or interrupted size="+size()+"."); } catch(IllegalMonitorStateException ex) {
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]