Author: markt
Date: Sat Nov 15 10:13:08 2008
New Revision: 717903
URL: http://svn.apache.org/viewvc?rev=717903&view=rev
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
Fix out of order message processing issues
Modified:
tomcat/tc6.0.x/trunk/ (props changed)
tomcat/tc6.0.x/trunk/STATUS.txt
tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java
tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml
Propchange: tomcat/tc6.0.x/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 15 10:13:08 2008
@@ -1 +1 @@
-/tomcat/trunk:673796,673820,683982,684001,684081,684234,684269-684270,687503,687645,690781,691392,691805,692748,695053,695311,696780,696782,698012,698227,698236,698613,711126
+/tomcat/trunk:673796,673820,683982,684001,684081,684234,684269-684270,687503,687645,690781,691392,691805,692748,695053,695311,696780,696782,698012,698227,698236,698613,699427,711126
Modified: tomcat/tc6.0.x/trunk/STATUS.txt
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/STATUS.txt?rev=717903&r1=717902&r2=717903&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/STATUS.txt (original)
+++ tomcat/tc6.0.x/trunk/STATUS.txt Sat Nov 15 10:13:08 2008
@@ -114,13 +114,6 @@
+1: rjung, mturk, markt, pero
0: remm (also affects to the two other AJP connectors)
-* Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
- Fix NPE and out of order message processing issues
- http://svn.apache.org/viewvc?rev=699427&view=rev
- +1: markt, fhanik, pero
- 0:
- -1:
-
* Fix serialisation issue reported by Find Bugs
http://svn.apache.org/viewvc?rev=699633&view=rev
+1: markt
@@ -248,7 +241,3 @@
+1: markt, fhanik
-1:
-* Don't swallow input if we know the connection is going to be closed
- http://svn.apache.org/viewvc?rev=714214&view=rev
- +1: billbarker
- -1:
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java?rev=717903&r1=717902&r2=717903&view=diff
==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java
(original)
+++
tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java
Sat Nov 15 10:13:08 2008
@@ -22,6 +22,9 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileNotFoundException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This factory is used to read files and write files by splitting them up into
@@ -74,7 +77,7 @@
protected FileOutputStream out;
/**
- * The number of messages we have read or written
+ * The number of messages we have written
*/
protected int nrOfMessagesProcessed = 0;
@@ -87,6 +90,19 @@
* The total number of packets that we split this file into
*/
protected long totalNrOfMessages = 0;
+
+ /**
+ * The number of the last message procssed. Message IDs are 1 based.
+ */
+ protected AtomicLong lastMessageProcessed = new AtomicLong(0);
+
+ /**
+ * Messages received out of order are held in the buffer until required. If
+ * everything is worked as expected, messages will spend very little time
in
+ * the buffer.
+ */
+ protected Map<Long, FileMessage> msgBuffer =
+ new ConcurrentHashMap<Long, FileMessage>();
/**
* The bytes that we hold the data in, not thread safe.
@@ -94,6 +110,12 @@
protected byte[] data = new byte[READ_SIZE];
/**
+ * Flag that indicates if a thread is writing messages to disk. Access to
+ * this flag must be synchronised.
+ */
+ protected boolean isWriting = false;
+
+ /**
* Private constructor, either instantiates a factory to read or write.
<BR>
* When openForWrite==true, then a the file, f, will be created and an
* output stream is opened to write to it. <BR>
@@ -205,25 +227,65 @@
if (log.isDebugEnabled())
log.debug("Message " + msg + " data " + msg.getData()
+ " data length " + msg.getDataLength() + " out " + out);
- if (out != null) {
- out.write(msg.getData(), 0, msg.getDataLength());
- nrOfMessagesProcessed++;
+
+ if (msg.getMessageNumber() <= lastMessageProcessed.get()) {
+ // Duplicate of message already processed
+ log.warn("Receive Message again -- Sender ActTimeout too short [
path: "
+ + msg.getContextPath()
+ + " war: "
+ + msg.getFileName()
+ + " data: "
+ + msg.getData()
+ + " data length: " + msg.getDataLength() + " ]");
+ return false;
+ }
+
+ FileMessage previous =
+ msgBuffer.put(new Long(msg.getMessageNumber()), msg);
+ if (previous !=null) {
+ // Duplicate of message not yet processed
+ log.warn("Receive Message again -- Sender ActTimeout too short [
path: "
+ + msg.getContextPath()
+ + " war: "
+ + msg.getFileName()
+ + " data: "
+ + msg.getData()
+ + " data length: " + msg.getDataLength() + " ]");
+ return false;
+ }
+
+ FileMessage next = null;
+ synchronized (this) {
+ if (!isWriting) {
+ next = msgBuffer.get(new Long(lastMessageProcessed.get() + 1));
+ if (next != null) {
+ isWriting = true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ while (next != null) {
+ out.write(next.getData(), 0, next.getDataLength());
+ lastMessageProcessed.incrementAndGet();
out.flush();
- if (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) {
+ if (next.getMessageNumber() == next.getTotalNrOfMsgs()) {
out.close();
cleanup();
return true;
- }//end if
- } else {
- if (log.isWarnEnabled())
- log.warn("Receive Message again -- Sender ActTimeout to short
[ path: "
- + msg.getContextPath()
- + " war: "
- + msg.getFileName()
- + " data: "
- + msg.getData()
- + " data length: " + msg.getDataLength() + "
]");
+ }
+ synchronized(this) {
+ next =
+ msgBuffer.get(new Long(lastMessageProcessed.get() + 1));
+ if (next == null) {
+ isWriting = false;
+ }
+ }
}
+
return false;
}//writeMessage
@@ -248,6 +310,8 @@
data = null;
nrOfMessagesProcessed = 0;
totalNrOfMessages = 0;
+ msgBuffer.clear();
+ lastMessageProcessed = null;
}
/**
@@ -309,4 +373,4 @@
return file;
}
-}
\ No newline at end of file
+}
Modified: tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml?rev=717903&r1=717902&r2=717903&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Sat Nov 15 10:13:08 2008
@@ -249,6 +249,10 @@
<bug>45618</bug>: Make sure NIO selector is closed when no longer used.
Unlikely to be an issue in normal usage. (markt)
</fix>
+ <fix>
+ <bug>45851</bug>: Fix out of order message processing issues with the
+ FarmWarDeployer. (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="Webapps">
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]