Author: markt Date: Sat Nov 15 10:13:08 2008 New Revision: 717903 URL: http://svn.apache.org/viewvc?rev=717903&view=rev Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851 Fix out of order message processing issues
Modified: tomcat/tc6.0.x/trunk/ (props changed) tomcat/tc6.0.x/trunk/STATUS.txt tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Propchange: tomcat/tc6.0.x/trunk/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Sat Nov 15 10:13:08 2008 @@ -1 +1 @@ -/tomcat/trunk:673796,673820,683982,684001,684081,684234,684269-684270,687503,687645,690781,691392,691805,692748,695053,695311,696780,696782,698012,698227,698236,698613,711126 +/tomcat/trunk:673796,673820,683982,684001,684081,684234,684269-684270,687503,687645,690781,691392,691805,692748,695053,695311,696780,696782,698012,698227,698236,698613,699427,711126 Modified: tomcat/tc6.0.x/trunk/STATUS.txt URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/STATUS.txt?rev=717903&r1=717902&r2=717903&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/STATUS.txt (original) +++ tomcat/tc6.0.x/trunk/STATUS.txt Sat Nov 15 10:13:08 2008 @@ -114,13 +114,6 @@ +1: rjung, mturk, markt, pero 0: remm (also affects to the two other AJP connectors) -* Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851 - Fix NPE and out of order message processing issues - http://svn.apache.org/viewvc?rev=699427&view=rev - +1: markt, fhanik, pero - 0: - -1: - * Fix serialisation issue reported by Find Bugs http://svn.apache.org/viewvc?rev=699633&view=rev +1: markt @@ -248,7 +241,3 @@ +1: markt, fhanik -1: -* Don't swallow input if we know the connection is going to be closed - http://svn.apache.org/viewvc?rev=714214&view=rev - +1: billbarker - -1: Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java?rev=717903&r1=717902&r2=717903&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java Sat Nov 15 10:13:08 2008 @@ -22,6 +22,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileNotFoundException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; /** * This factory is used to read files and write files by splitting them up into @@ -74,7 +77,7 @@ protected FileOutputStream out; /** - * The number of messages we have read or written + * The number of messages we have written */ protected int nrOfMessagesProcessed = 0; @@ -87,6 +90,19 @@ * The total number of packets that we split this file into */ protected long totalNrOfMessages = 0; + + /** + * The number of the last message procssed. Message IDs are 1 based. + */ + protected AtomicLong lastMessageProcessed = new AtomicLong(0); + + /** + * Messages received out of order are held in the buffer until required. If + * everything is worked as expected, messages will spend very little time in + * the buffer. + */ + protected Map<Long, FileMessage> msgBuffer = + new ConcurrentHashMap<Long, FileMessage>(); /** * The bytes that we hold the data in, not thread safe. @@ -94,6 +110,12 @@ protected byte[] data = new byte[READ_SIZE]; /** + * Flag that indicates if a thread is writing messages to disk. Access to + * this flag must be synchronised. + */ + protected boolean isWriting = false; + + /** * Private constructor, either instantiates a factory to read or write. <BR> * When openForWrite==true, then a the file, f, will be created and an * output stream is opened to write to it. <BR> @@ -205,25 +227,65 @@ if (log.isDebugEnabled()) log.debug("Message " + msg + " data " + msg.getData() + " data length " + msg.getDataLength() + " out " + out); - if (out != null) { - out.write(msg.getData(), 0, msg.getDataLength()); - nrOfMessagesProcessed++; + + if (msg.getMessageNumber() <= lastMessageProcessed.get()) { + // Duplicate of message already processed + log.warn("Receive Message again -- Sender ActTimeout too short [ path: " + + msg.getContextPath() + + " war: " + + msg.getFileName() + + " data: " + + msg.getData() + + " data length: " + msg.getDataLength() + " ]"); + return false; + } + + FileMessage previous = + msgBuffer.put(new Long(msg.getMessageNumber()), msg); + if (previous !=null) { + // Duplicate of message not yet processed + log.warn("Receive Message again -- Sender ActTimeout too short [ path: " + + msg.getContextPath() + + " war: " + + msg.getFileName() + + " data: " + + msg.getData() + + " data length: " + msg.getDataLength() + " ]"); + return false; + } + + FileMessage next = null; + synchronized (this) { + if (!isWriting) { + next = msgBuffer.get(new Long(lastMessageProcessed.get() + 1)); + if (next != null) { + isWriting = true; + } else { + return false; + } + } else { + return false; + } + } + + while (next != null) { + out.write(next.getData(), 0, next.getDataLength()); + lastMessageProcessed.incrementAndGet(); out.flush(); - if (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) { + if (next.getMessageNumber() == next.getTotalNrOfMsgs()) { out.close(); cleanup(); return true; - }//end if - } else { - if (log.isWarnEnabled()) - log.warn("Receive Message again -- Sender ActTimeout to short [ path: " - + msg.getContextPath() - + " war: " - + msg.getFileName() - + " data: " - + msg.getData() - + " data length: " + msg.getDataLength() + " ]"); + } + synchronized(this) { + next = + msgBuffer.get(new Long(lastMessageProcessed.get() + 1)); + if (next == null) { + isWriting = false; + } + } } + return false; }//writeMessage @@ -248,6 +310,8 @@ data = null; nrOfMessagesProcessed = 0; totalNrOfMessages = 0; + msgBuffer.clear(); + lastMessageProcessed = null; } /** @@ -309,4 +373,4 @@ return file; } -} \ No newline at end of file +} Modified: tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml?rev=717903&r1=717902&r2=717903&view=diff ============================================================================== --- tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Sat Nov 15 10:13:08 2008 @@ -249,6 +249,10 @@ <bug>45618</bug>: Make sure NIO selector is closed when no longer used. Unlikely to be an issue in normal usage. (markt) </fix> + <fix> + <bug>45851</bug>: Fix out of order message processing issues with the + FarmWarDeployer. (markt) + </fix> </changelog> </subsection> <subsection name="Webapps"> --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]