Author: gtully
Date: Fri Oct 22 19:31:18 2010
New Revision: 1026457
URL: http://svn.apache.org/viewvc?rev=1026457&view=rev
Log:
additional tests and refactoring of fix for
https://issues.apache.org/activemq/browse/AMQ-2985, selector scan was off by
one and unmatched acking skipped some messages leaving them available to
subsequent consumers. Also acking always left the first consumed message
available for a reconnected durable
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Oct 22 19:31:18 2010
@@ -1655,7 +1655,8 @@ public class Queue extends BaseDestinati
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn
+ ", Inflight: "
+ destinationStatistics.getInflight().getCount() + ",
pagedInMessages.size "
- + pagedInMessages.size() + ", enqueueSize: " +
destinationStatistics.getEnqueues().getCount());
+ + pagedInMessages.size() + ", enqueueCount: " +
destinationStatistics.getEnqueues().getCount()
+ + ", dequeueCount: " +
destinationStatistics.getDequeues().getCount());
}
if (isLazyDispatch() && !force) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Oct 22 19:31:18 2010
@@ -737,7 +737,7 @@ public class KahaDBStore extends Message
selectorExpression =
SelectorParser.parse(selector);
}
sd.orderIndex.resetCursorPosition();
- sd.orderIndex.setBatch(tx, (selectorExpression !=
null? 0 : cursorPos));
+ sd.orderIndex.setBatch(tx, (selectorExpression ==
null ? cursorPos : -1));
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry =
iterator.next();
@@ -773,7 +773,7 @@ public class KahaDBStore extends Message
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- sd.orderIndex.setBatch(tx, (info.getSelector() == null
? cursorPos : 0));
+ sd.orderIndex.setBatch(tx, (info.getSelector() == null
? cursorPos : -1));
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@@ -800,7 +800,7 @@ public class KahaDBStore extends Message
MessageOrderCursor moc =
sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
long pos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- sd.orderIndex.setBatch(tx, (info.getSelector() ==
null ? pos : 0));
+ sd.orderIndex.setBatch(tx, (info.getSelector() ==
null ? pos : -1));
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Oct 22 19:31:18 2010
@@ -1037,11 +1037,11 @@ public class MessageDatabase extends Ser
String subscriptionKey = command.getSubscriptionKey();
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey,
sequence);
- // The following method handles deleting un-referenced
messages.
- removeAckLocation(tx, sd, subscriptionKey, prev);
-
// Add it to the new location set.
addAckLocation(sd, sequence, subscriptionKey);
+
+ // The following method handles deleting un-referenced
messages.
+ removeAckLocation(tx, sd, subscriptionKey, sequence);
}
}
@@ -1152,16 +1152,17 @@ public class MessageDatabase extends Ser
break;
}
}
+ LOG.trace("gc candidates after first tx:" +
firstTxLocation.getDataFileId() + ", " + gcCandidateSet);
}
// Go through all the destinations to see if any of them can
remove GC candidates.
- for (StoredDestination sd : storedDestinations.values()) {
+ for (Entry<String, StoredDestination> entry :
storedDestinations.entrySet()) {
if( gcCandidateSet.isEmpty() ) {
break;
}
// Use a visitor to cut down the number of pages that we load
- sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+ entry.getValue().locationIndex.visit(tx, new
BTreeVisitor<Location, Long>() {
int last=-1;
public boolean isInterestedInKeysBetween(Location first,
Location second) {
if( first==null ) {
@@ -1199,10 +1200,11 @@ public class MessageDatabase extends Ser
}
});
+ LOG.trace("gc candidates after dest:" + entry.getKey() + ", "
+ gcCandidateSet);
}
// check we are not deleting file with ack for in-use journal files
- LOG.debug("gc candidates: " + gcCandidateSet);
+ LOG.trace("gc candidates: " + gcCandidateSet);
final TreeSet<Integer> gcCandidates = new
TreeSet<Integer>(gcCandidateSet);
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
@@ -1219,7 +1221,7 @@ public class MessageDatabase extends Ser
if (gcCandidateSet.contains(candidate)) {
ackMessageFileMap.remove(candidate);
} else {
- LOG.debug("not removing data file: " + candidate
+ LOG.trace("not removing data file: " + candidate
+ " as contained ack(s) refer to referenced
file: " + referencedFileIds);
}
}
@@ -1504,24 +1506,16 @@ public class MessageDatabase extends Ser
if (hs != null) {
hs.remove(subscriptionKey);
if (hs.isEmpty()) {
- HashSet<String> firstSet =
sd.ackPositions.values().iterator().next();
sd.ackPositions.remove(sequenceId);
- // Did we just empty out the first set in the
- // ordered list of ack locations? Then it's time to
- // delete some messages.
- if (hs == firstSet) {
-
- // Find all the entries that need to get deleted.
- ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
- sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
- // Do the actual deletes.
- for (Entry<Long, MessageKeys> entry : deletes) {
- sd.locationIndex.remove(tx,
entry.getValue().location);
-
sd.messageIdIndex.remove(tx,entry.getValue().messageId);
- sd.orderIndex.remove(tx,entry.getKey());
- }
+ ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
+ sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+ // Do the actual delete.
+ for (Entry<Long, MessageKeys> entry : deletes) {
+ sd.locationIndex.remove(tx, entry.getValue().location);
+ sd.messageIdIndex.remove(tx,
entry.getValue().messageId);
+ sd.orderIndex.remove(tx, entry.getKey());
}
}
}
@@ -2033,19 +2027,10 @@ public class MessageDatabase extends Ser
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>>
deletes,
BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws
IOException {
- for (Iterator<Entry<Long, MessageKeys>> iterator =
index.iterator(tx); iterator.hasNext();) {
- Entry<Long, MessageKeys> entry = iterator.next();
- if (entry.getKey().compareTo(sequenceId) == 0) {
- // We don't do the actually delete while we are
- // iterating the BTree since
- // iterating would fail.
- deletes.add(entry);
- } else {
- // no point in iterating the in-order sequences anymore
- break;
- }
- }
- }
+
+ Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx,
sequenceId);
+ deletes.add(iterator.next());
+ }
long getNextMessageId(int priority) {
return nextMessageId++;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1026457&r1=1026456&r2=1026457&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Fri Oct 22 19:31:18 2010
@@ -26,10 +26,13 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.File;
public class DurableSubscriptionOfflineTest extends
org.apache.activemq.TestSupport {
+ private static final Log LOG =
LogFactory.getLog(DurableSubscriptionOfflineTest.class);
public Boolean usePrioritySupport = Boolean.TRUE;
private BrokerService broker;
private ActiveMQTopic topic;
@@ -65,6 +68,7 @@ public class DurableSubscriptionOfflineT
broker = BrokerFactory.createBroker("broker:(vm://" + getName(true)
+")");
broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(true);
+ broker.getManagementContext().setCreateConnector(false);
if (usePrioritySupport) {
PolicyEntry policy = new PolicyEntry();
@@ -132,6 +136,104 @@ public class DurableSubscriptionOfflineT
assertEquals(sent, listener.count);
}
+ public void testOfflineSubscription2() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'",
true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic,
"SubsId", "filter = 'true'", true);
+ Listener listener = new Listener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+ }
+
+ public void testOfflineSubscription3() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'",
true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ final int numMessages = 10;
+ int sent = 0;
+ for (int i = 0; i < numMessages; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic,
"SubsId", "filter = 'true'", true);
+ Listener listener = new Listener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ LOG.info("Consumed: " + listener.count);
+ assertEquals(numMessages, listener.count);
+
+ // consume messages again, should not get any
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createDurableSubscriber(topic, "SubsId", "filter =
'true'", true);
+ listener = new Listener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(0, listener.count);
+ }
+
public static class Listener implements MessageListener {
int count = 0;