Author: chirino
Date: Wed Sep 17 09:59:42 2008
New Revision: 696370
URL: http://svn.apache.org/viewvc?rev=696370&view=rev
Log:
This fixes the recent errors the test cases have been seeing with transacted
acks due to the new ack assertion bits added.
We now take the mesage out of the dispatch list when the ack is received, but
we put it back on a rollback.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=696370&r1=696369&r2=696370&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Sep 17 09:59:42 2008
@@ -197,6 +197,7 @@
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
+ removeList.add(node);
if (!context.isInTransaction()) {
dequeueCounter++;
if (!this.getConsumerInfo().isBrowser()) {
@@ -205,7 +206,6 @@
if (!isSlave()) {
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
- removeList.add(node);
} else {
// setup a Synchronization to remove nodes from the
// dispatched list.
@@ -215,9 +215,7 @@
public void afterCommit()
throws Exception {
synchronized(dispatchLock) {
-
dequeueCounter++;
- dispatched.remove(node);
node
.getRegionDestination()
.getDestinationStatistics()
@@ -234,9 +232,11 @@
}
}
- public void afterRollback()
- throws Exception {
- super.afterRollback();
+ public void afterRollback() throws
Exception {
+ // Need to put it back in the
front.
+ synchronized(dispatchLock) {
+ dispatched.add(0, node);
+ }
}
});
}
@@ -426,12 +426,16 @@
boolean checkFoundStart = false;
boolean checkFoundEnd = false;
for (MessageReference node : dispatched) {
- if (!checkFoundStart && firstAckedMsg != null &&
firstAckedMsg.equals(node.getMessageId())) {
+
+ if( firstAckedMsg == null ) {
+ checkFoundStart=true;
+ } else if (!checkFoundStart &&
firstAckedMsg.equals(node.getMessageId())) {
checkFoundStart = true;
}
- if (checkFoundStart || firstAckedMsg == null)
+ if (checkFoundStart) {
checkCount++;
+ }
if (lastAckedMsg != null &&
lastAckedMsg.equals(node.getMessageId())) {
checkFoundEnd = true;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=696370&r1=696369&r2=696370&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
Wed Sep 17 09:59:42 2008
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import javax.transaction.xa.XAException;
@@ -88,6 +89,7 @@
}
public void fireAfterRollback() throws Exception {
+ Collections.reverse(synchronizations);
for (Iterator<Synchronization> iter = synchronizations.iterator();
iter.hasNext();) {
Synchronization s = iter.next();
s.afterRollback();