Author: gtully
Date: Sat Oct 23 01:36:13 2010
New Revision: 1026543
URL: http://svn.apache.org/viewvc?rev=1026543&view=rev
Log:
additional tests and refactoring of fix for
https://issues.apache.org/activemq/browse/AMQ-2985, not updating
subscriptionAck with an unmatched message resolves recovery of unmatched
selector durables, reveting the previous change
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1026543&r1=1026542&r2=1026543&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Sat Oct 23 01:36:13 2010
@@ -631,19 +631,22 @@ public class KahaDBStore extends Message
}
}
} else {
- doAcknowledge(context, subscriptionKey, messageId);
+ doAcknowledge(context, subscriptionKey, messageId, ack);
}
} else {
- doAcknowledge(context, subscriptionKey, messageId);
+ doAcknowledge(context, subscriptionKey, messageId, ack);
}
}
- protected void doAcknowledge(ConnectionContext context, String
subscriptionKey, MessageId messageId)
+ protected void doAcknowledge(ConnectionContext context, String
subscriptionKey, MessageId messageId, MessageAck ack)
throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
command.setMessageId(messageId.toString());
+ if (ack != null && ack.isUnmatchedAck()) {
+ command.setAck(UNMATCHED);
+ }
store(command, false, null, null);
}
@@ -737,7 +740,7 @@ public class KahaDBStore extends Message
selectorExpression =
SelectorParser.parse(selector);
}
sd.orderIndex.resetCursorPosition();
- sd.orderIndex.setBatch(tx, (selectorExpression ==
null ? cursorPos : -1));
+ sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry =
iterator.next();
@@ -766,14 +769,13 @@ public class KahaDBStore extends Message
public void recoverSubscription(String clientId, String
subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
- final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- sd.orderIndex.setBatch(tx, (info.getSelector() == null
? cursorPos : -1));
+ sd.orderIndex.setBatch(tx, cursorPos);
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
@@ -790,7 +792,6 @@ public class KahaDBStore extends Message
public void recoverNextMessages(String clientId, String
subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
- final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@@ -800,7 +801,7 @@ public class KahaDBStore extends Message
MessageOrderCursor moc =
sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
long pos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- sd.orderIndex.setBatch(tx, (info.getSelector() ==
null ? pos : -1));
+ sd.orderIndex.setBatch(tx, pos);
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
@@ -1228,7 +1229,7 @@ public class KahaDBStore extends Message
// apply any acks we have
synchronized (this.subscriptionKeys) {
for (String key : this.subscriptionKeys) {
- this.topicStore.doAcknowledge(context, key,
this.message.getMessageId());
+ this.topicStore.doAcknowledge(context, key,
this.message.getMessageId(), null);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1026543&r1=1026542&r2=1026543&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Sat Oct 23 01:36:13 2010
@@ -88,6 +88,7 @@ public class MessageDatabase extends Ser
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME =
"org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME =
Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
+ protected static final Buffer UNMATCHED = new Buffer(new byte[]{});
private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -1037,11 +1038,14 @@ public class MessageDatabase extends Ser
String subscriptionKey = command.getSubscriptionKey();
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey,
sequence);
+ if (command.getAck() == UNMATCHED) {
+ sd.subscriptionAcks.put(tx, subscriptionKey, prev);
+ }
+ // The following method handles deleting un-referenced
messages.
+ removeAckLocation(tx, sd, subscriptionKey, prev);
+
// Add it to the new location set.
addAckLocation(sd, sequence, subscriptionKey);
-
- // The following method handles deleting un-referenced
messages.
- removeAckLocation(tx, sd, subscriptionKey, sequence);
}
}
@@ -1506,16 +1510,24 @@ public class MessageDatabase extends Ser
if (hs != null) {
hs.remove(subscriptionKey);
if (hs.isEmpty()) {
+ HashSet<String> firstSet =
sd.ackPositions.values().iterator().next();
sd.ackPositions.remove(sequenceId);
- ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
- sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
-
- // Do the actual delete.
- for (Entry<Long, MessageKeys> entry : deletes) {
- sd.locationIndex.remove(tx, entry.getValue().location);
- sd.messageIdIndex.remove(tx,
entry.getValue().messageId);
- sd.orderIndex.remove(tx, entry.getKey());
+ // Did we just empty out the first set in the
+ // ordered list of ack locations? Then it's time to
+ // delete some messages.
+ if (hs == firstSet) {
+
+ // Find all the entries that need to get deleted.
+ ArrayList<Entry<Long, MessageKeys>> deletes = new
ArrayList<Entry<Long, MessageKeys>>();
+ sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
+
+ // Do the actual deletes.
+ for (Entry<Long, MessageKeys> entry : deletes) {
+ sd.locationIndex.remove(tx,
entry.getValue().location);
+
sd.messageIdIndex.remove(tx,entry.getValue().messageId);
+ sd.orderIndex.remove(tx,entry.getKey());
+ }
}
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1026543&r1=1026542&r2=1026543&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Sat Oct 23 01:36:13 2010
@@ -43,8 +43,12 @@ public class DurableSubscriptionOfflineT
@Override
protected Connection createConnection() throws Exception {
+ return createConnection("cliName");
+ }
+
+ protected Connection createConnection(String name) throws Exception {
Connection con = super.createConnection();
- con.setClientID("cliName");
+ con.setClientID(name);
con.start();
return con;
}
@@ -190,9 +194,8 @@ public class DurableSubscriptionOfflineT
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
- final int numMessages = 10;
int sent = 0;
- for (int i = 0; i < numMessages; i++) {
+ for (int i = 0; i < 10; i++) {
sent++;
Message message = session.createMessage();
message.setStringProperty("filter", "true");
@@ -217,7 +220,7 @@ public class DurableSubscriptionOfflineT
con.close();
LOG.info("Consumed: " + listener.count);
- assertEquals(numMessages, listener.count);
+ assertEquals(sent, listener.count);
// consume messages again, should not get any
con = createConnection();
@@ -233,6 +236,60 @@ public class DurableSubscriptionOfflineT
assertEquals(0, listener.count);
}
+
+ public void testOfflineSubscription4() throws Exception {
+ // create durable subscription 1
+ Connection con = createConnection("cliId1");
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'",
true);
+ session.close();
+ con.close();
+
+ // create durable subscription 2
+ Connection con2 = createConnection("cliId2");
+ Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createDurableSubscriber(topic,
"SubsId", "filter = 'true'", true);
+ Listener listener2 = new Listener();
+ consumer2.setMessageListener(listener2);
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ sent++;
+ Message message = session.createMessage();
+ message.setStringProperty("filter", "true");
+ producer.send(topic, message);
+ }
+
+ Thread.sleep(1 * 1000);
+ session.close();
+ con.close();
+
+ // test online subs
+ Thread.sleep(3 * 1000);
+ session2.close();
+ con2.close();
+
+ assertEquals(sent, listener2.count);
+
+ // consume messages
+ con = createConnection("cliId1");
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic,
"SubsId", "filter = 'true'", true);
+ Listener listener = new Listener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals("offline consumer got all", sent, listener.count);
+ }
public static class Listener implements MessageListener {
int count = 0;