Author: chirino
Date: Mon Oct 1 13:02:18 2007
New Revision: 581053
URL: http://svn.apache.org/viewvc?rev=581053&view=rev
Log:
Fix for AMQ-1095:
- Added contributed test cases
- We now filter out non-matching messages as they are loaded into the
TopicStorePrefetch
- Changed the TopicStorePrefetch and StoreDurableSubscriberCursor so that they
don't depend
on the pending message counter since some stores cannot give an accurate
count for it.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
(with props)
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Oct 1 13:02:18 2007
@@ -48,7 +48,8 @@
public DurableTopicSubscription(Broker broker, SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException {
- super(broker, context, info, new
StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(),
broker.getTempDataStore(), info.getPrefetchSize()));
+ super(broker, context, info);
+ this.pending = new StoreDurableSubscriberCursor(context.getClientId(),
info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(),
this);
this.usageManager = usageManager;
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(),
info.getSubscriptionName());
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Oct 1 13:02:18 2007
@@ -410,6 +410,7 @@
if (message == null) {
return false;
}
+
// Make sure we can dispatch a message.
if (canDispatch(node) && !isSlave()) {
MessageDispatch md = createMessageDispatch(node, message);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Mon Oct 1 13:02:18 2007
@@ -26,6 +26,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
@@ -42,7 +43,6 @@
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor
{
private static final Log LOG =
LogFactory.getLog(StoreDurableSubscriberCursor.class);
- private int pendingCount;
private String clientId;
private String subscriberName;
private Map<Destination, TopicStorePrefetch> topics = new
HashMap<Destination, TopicStorePrefetch>();
@@ -50,6 +50,7 @@
private boolean started;
private PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor;
+ private final Subscription subscription;
/**
* @param topic
@@ -57,9 +58,10 @@
* @param subscriberName
* @throws IOException
*/
- public StoreDurableSubscriberCursor(String clientId, String
subscriberName, Store store, int maxBatchSize) {
+ public StoreDurableSubscriberCursor(String clientId, String
subscriberName, Store store, int maxBatchSize, Subscription subscription) {
this.clientId = clientId;
this.subscriberName = subscriberName;
+ this.subscription = subscription;
this.nonPersistent = new FilePendingMessageCursor(clientId +
subscriberName, store);
storePrefetches.add(nonPersistent);
}
@@ -69,7 +71,6 @@
started = true;
for (PendingMessageCursor tsp : storePrefetches) {
tsp.start();
- pendingCount += tsp.size();
}
}
}
@@ -80,8 +81,6 @@
for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop();
}
-
- pendingCount = 0;
}
}
@@ -94,14 +93,13 @@
*/
public synchronized void add(ConnectionContext context, Destination
destination) throws Exception {
if (destination != null &&
!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
- TopicStorePrefetch tsp = new
TopicStorePrefetch((Topic)destination, clientId, subscriberName);
+ TopicStorePrefetch tsp = new
TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription);
tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setSystemUsage(systemUsage);
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (started) {
tsp.start();
- pendingCount += tsp.size();
}
}
}
@@ -124,14 +122,18 @@
* @return true if there are no pending messages
*/
public synchronized boolean isEmpty() {
- return pendingCount <= 0;
+ for (PendingMessageCursor tsp : storePrefetches) {
+ if( !tsp.isEmpty() )
+ return false;
+ }
+ return true;
}
public boolean isEmpty(Destination destination) {
boolean result = true;
TopicStorePrefetch tsp = topics.get(destination);
if (tsp != null) {
- result = tsp.size() <= 0;
+ result = tsp.isEmpty();
}
return result;
}
@@ -151,7 +153,6 @@
if (node != null) {
Message msg = node.getMessage();
if (started) {
- pendingCount++;
if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node);
}
@@ -171,7 +172,6 @@
}
public synchronized void clear() {
- pendingCount = 0;
nonPersistent.clear();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.clear();
@@ -179,7 +179,7 @@
}
public synchronized boolean hasNext() {
- boolean result = pendingCount > 0;
+ boolean result = true;
if (result) {
try {
currentCursor = getNextCursor();
@@ -201,14 +201,12 @@
if (currentCursor != null) {
currentCursor.remove();
}
- pendingCount--;
}
public synchronized void remove(MessageReference node) {
if (currentCursor != null) {
currentCursor.remove(node);
}
- pendingCount--;
}
public synchronized void reset() {
@@ -226,6 +224,10 @@
}
public int size() {
+ int pendingCount=0;
+ for (PendingMessageCursor tsp : storePrefetches) {
+ pendingCount += tsp.size();
+ }
return pendingCount;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Mon Oct 1 13:02:18 2007
@@ -17,12 +17,15 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
+import java.util.Iterator;
import java.util.LinkedList;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log;
@@ -44,16 +47,19 @@
private Destination regionDestination;
private MessageId firstMessageId;
private MessageId lastMessageId;
- private int pendingCount;
+ private boolean batchResetNeeded = true;
+ private boolean storeMayHaveMoreMessages = true;
private boolean started;
+ private final Subscription subscription;
/**
* @param topic
* @param clientId
* @param subscriberName
*/
- public TopicStorePrefetch(Topic topic, String clientId, String
subscriberName) {
+ public TopicStorePrefetch(Topic topic, String clientId, String
subscriberName, Subscription subscription) {
this.regionDestination = topic;
+ this.subscription = subscription;
this.store = (TopicMessageStore)topic.getMessageStore();
this.clientId = clientId;
this.subscriberName = subscriberName;
@@ -62,13 +68,7 @@
public synchronized void start() {
if (!started) {
started = true;
- pendingCount = getStoreSize();
- try {
- fillBatch();
- } catch (Exception e) {
- LOG.error("Failed to fill batch", e);
- throw new RuntimeException(e);
- }
+ safeFillBatch();
}
}
@@ -84,11 +84,13 @@
* @return true if there are no pendingCount messages
*/
public synchronized boolean isEmpty() {
- return pendingCount <= 0;
+ safeFillBatch();
+ return batchList.isEmpty();
}
public synchronized int size() {
- return getPendingCount();
+ safeFillBatch();
+ return batchList.size();
}
public synchronized void addMessageLast(MessageReference node) throws
Exception {
@@ -98,7 +100,7 @@
}
lastMessageId = node.getMessageId();
node.decrementReferenceCount();
- pendingCount++;
+ storeMayHaveMoreMessages=true;
}
}
@@ -108,20 +110,18 @@
firstMessageId = node.getMessageId();
}
node.decrementReferenceCount();
- pendingCount++;
+ storeMayHaveMoreMessages=true;
}
}
public synchronized void remove() {
- pendingCount--;
}
public synchronized void remove(MessageReference node) {
- pendingCount--;
}
public synchronized void clear() {
- pendingCount = 0;
+ gc();
}
public synchronized boolean hasNext() {
@@ -130,27 +130,17 @@
public synchronized MessageReference next() {
Message result = null;
- if (!isEmpty()) {
- if (batchList.isEmpty()) {
- try {
- fillBatch();
- } catch (final Exception e) {
- LOG.error("Failed to fill batch", e);
- throw new RuntimeException(e);
- }
- if (batchList.isEmpty()) {
- return null;
- }
- }
- if (!batchList.isEmpty()) {
- result = batchList.removeFirst();
- if (lastMessageId != null) {
- if (result.getMessageId().equals(lastMessageId)) {
- // pendingCount=0;
- }
+ safeFillBatch();
+ if (batchList.isEmpty()) {
+ return null;
+ } else {
+ result = batchList.removeFirst();
+ if (lastMessageId != null) {
+ if (result.getMessageId().equals(lastMessageId)) {
+ // pendingCount=0;
}
- result.setRegionDestination(regionDestination);
}
+ result.setRegionDestination(regionDestination);
}
return result;
}
@@ -163,12 +153,16 @@
}
public synchronized boolean recoverMessage(Message message) throws
Exception {
- message.setRegionDestination(regionDestination);
- // only increment if count is zero (could have been cached)
- if (message.getReferenceCount() == 0) {
- message.incrementReferenceCount();
+ MessageEvaluationContext messageEvaluationContext = new
MessageEvaluationContext();
+ messageEvaluationContext.setMessageReference(message);
+ if( subscription.matches(message, messageEvaluationContext) ) {
+ message.setRegionDestination(regionDestination);
+ // only increment if count is zero (could have been cached)
+ if (message.getReferenceCount() == 0) {
+ message.incrementReferenceCount();
+ }
+ batchList.addLast(message);
}
- batchList.addLast(message);
return true;
}
@@ -178,38 +172,43 @@
}
// implementation
+ protected void safeFillBatch() {
+ try {
+ fillBatch();
+ } catch (Exception e) {
+ LOG.error("Failed to fill batch", e);
+ throw new RuntimeException(e);
+ }
+ }
+
protected synchronized void fillBatch() throws Exception {
- if (!isEmpty()) {
+ if( batchResetNeeded ) {
+ store.resetBatching(clientId, subscriberName);
+ batchResetNeeded=false;
+ storeMayHaveMoreMessages=true;
+ }
+
+ while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
store.recoverNextMessages(clientId, subscriberName, maxBatchSize,
this);
- if (firstMessageId != null) {
- int pos = 0;
- for (Message msg : batchList) {
- if (msg.getMessageId().equals(firstMessageId)) {
- firstMessageId = null;
- break;
- }
- pos++;
- }
- if (pos > 0) {
- for (int i = 0; i < pos && !batchList.isEmpty(); i++) {
- batchList.removeFirst();
- }
- if (batchList.isEmpty()) {
- LOG.debug("Refilling batch - haven't got past first
message = " + firstMessageId);
- fillBatch();
+ if( batchList.isEmpty() ) {
+ storeMayHaveMoreMessages = false;
+ } else {
+ if (firstMessageId != null) {
+ int pos = 0;
+ for (Iterator<Message> iter = batchList.iterator();
iter.hasNext();) {
+ Message msg = iter.next();
+ if (msg.getMessageId().equals(firstMessageId)) {
+ firstMessageId = null;
+ break;
+ } else {
+ iter.remove();
+ }
}
}
}
}
}
- protected synchronized int getPendingCount() {
- if (pendingCount <= 0) {
- pendingCount = getStoreSize();
- }
- return pendingCount;
- }
-
protected synchronized int getStoreSize() {
try {
return store.getMessageCount(clientId, subscriberName);
@@ -224,6 +223,7 @@
msg.decrementReferenceCount();
}
batchList.clear();
+ batchResetNeeded = true;
}
public String toString() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
Mon Oct 1 13:02:18 2007
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
@@ -39,7 +40,7 @@
* @param maxBatchSize
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String
clientId, String name, Store tmpStorage, int maxBatchSize) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String
clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new FilePendingMessageCursor(name, tmpStorage);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
Mon Oct 1 13:02:18 2007
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
@@ -36,5 +37,5 @@
* @param maxBatchSize
* @return the Pending Message cursor
*/
- PendingMessageCursor getSubscriberPendingMessageCursor(String clientId,
String name, Store tmpStorage, int maxBatchSize);
+ PendingMessageCursor getSubscriberPendingMessageCursor(String clientId,
String name, Store tmpStorage, int maxBatchSize, Subscription sub);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Oct 1 13:02:18 2007
@@ -116,7 +116,7 @@
String subName = sub.getSubscriptionName();
int prefetch = sub.getPrefetchSize();
if (pendingDurableSubscriberPolicy != null) {
- PendingMessageCursor cursor =
pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId,
subName, broker.getTempDataStore(), prefetch);
+ PendingMessageCursor cursor =
pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId,
subName, broker.getTempDataStore(), prefetch, sub);
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
Mon Oct 1 13:02:18 2007
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.kaha.Store;
@@ -40,7 +41,7 @@
* @param maxBatchSize
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String
clientId, String name, Store tmpStorage, int maxBatchSize) {
- return new StoreDurableSubscriberCursor(clientId, name, tmpStorage,
maxBatchSize);
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String
clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
+ return new StoreDurableSubscriberCursor(clientId, name, tmpStorage,
maxBatchSize, sub);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
Mon Oct 1 13:02:18 2007
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store;
@@ -38,7 +39,7 @@
* @param maxBatchSize
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String
clientId, String name, Store tmpStorage, int maxBatchSize) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String
clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
return new VMPendingMessageCursor();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
Mon Oct 1 13:02:18 2007
@@ -73,11 +73,8 @@
}
protected boolean recoverMessage(MessageRecoveryListener listener, Message
msg) throws Exception {
- if (listener.hasSpace()) {
- listener.recoverMessage(msg);
- return true;
- }
- return false;
+ listener.recoverMessage(msg);
+ return listener.hasSpace();
}
public void removeMessage(ConnectionContext context, MessageAck ack)
throws IOException {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Mon Oct 1 13:02:18 2007
@@ -64,11 +64,8 @@
protected final boolean recoverReference(MessageRecoveryListener listener,
ReferenceRecord record)
throws Exception {
- if (listener.hasSpace()) {
- listener.recoverMessageReference(new
MessageId(record.getMessageId()));
- return true;
- }
- return false;
+ listener.recoverMessageReference(new MessageId(record.getMessageId()));
+ return listener.hasSpace();
}
public synchronized void recover(MessageRecoveryListener listener) throws
Exception {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
Mon Oct 1 13:02:18 2007
@@ -284,7 +284,7 @@
// The we should get the messages.
for (int i = 0; i < 4; i++) {
Message m2 = receiveMessage(connection2);
- assertNotNull(m2);
+ assertNotNull("Did not get message "+i, m2);
}
assertNoMessagesLeft(connection2);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?rev=581053&r1=581052&r2=581053&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
Mon Oct 1 13:02:18 2007
@@ -77,7 +77,8 @@
consumer = getConsumer(consumerConnection);
List<Message> consumerList = new ArrayList<Message>();
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message msg = consumer.receive();
+ Message msg = consumer.receive(1000*5);
+ assertNotNull("Message "+i+" was missing.", msg);
consumerList.add(msg);
}
assertEquals(senderList, consumerList);
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java?rev=581053&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
Mon Oct 1 13:02:18 2007
@@ -0,0 +1,163 @@
+/* ====================================================================
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================== */
+
+package org.apache.activemq.bugs.amq1095;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+
+/**
+ * <p>
+ * Common functionality for ActiveMQ test cases.
+ * </p>
+ *
+ * @author Rainer Klute <a
+ * href="mailto:[EMAIL PROTECTED]"><[EMAIL PROTECTED]></a>
+ * @since 2007-08-10
+ * @version $Id: ActiveMQTestCase.java 12 2007-08-14 12:02:02Z rke $
+ */
+public class ActiveMQTestCase extends TestCase
+{
+ private Context context;
+ private BrokerService broker;
+ protected Connection connection;
+ protected Destination destination;
+ private List<MessageConsumer> consumersToEmpty = new
LinkedList<MessageConsumer>();
+ protected final long RECEIVE_TIMEOUT = 500;
+
+
+ /** <p>Constructor</p> */
+ public ActiveMQTestCase()
+ {}
+
+ /** <p>Constructor</p>
+ * @param name the test case's name
+ */
+ public ActiveMQTestCase(final String name)
+ {
+ super(name);
+ }
+
+ /**
+ * <p>Sets up the JUnit testing environment.
+ */
+ protected void setUp()
+ {
+ URI uri;
+ try
+ {
+ /* Copy all system properties starting with "java.naming." to the
initial context. */
+ final Properties systemProperties = System.getProperties();
+ final Properties jndiProperties = new Properties();
+ for (final Iterator i = systemProperties.keySet().iterator();
i.hasNext();)
+ {
+ final String key = (String) i.next();
+ if (key.startsWith("java.naming.") || key.startsWith("topic.")
||
+ key.startsWith("queue."))
+ {
+ final String value = (String) systemProperties.get(key);
+ jndiProperties.put(key, value);
+ }
+ }
+ context = new InitialContext(jndiProperties);
+ uri = new
URI("xbean:org/apache/activemq/bugs/amq1095/activemq.xml");
+ broker = BrokerFactory.createBroker(uri);
+ broker.start();
+ }
+ catch (Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
+
+ final ConnectionFactory connectionFactory;
+ try
+ {
+ /* Lookup the connection factory. */
+ connectionFactory = (ConnectionFactory)
context.lookup("TopicConnectionFactory");
+
+ destination = new ActiveMQTopic("TestTopic");
+
+ /* Create a connection: */
+ connection = connectionFactory.createConnection();
+ connection.setClientID("sampleClientID");
+ }
+ catch (JMSException ex1)
+ {
+ ex1.printStackTrace();
+ Assert.fail(ex1.toString());
+ }
+ catch (NamingException ex2) {
+ ex2.printStackTrace();
+ Assert.fail(ex2.toString());
+ }
+ catch (Throwable ex3) {
+ ex3.printStackTrace();
+ Assert.fail(ex3.toString());
+ }
+ }
+
+
+ /**
+ * <p>
+ * Tear down the testing environment by receiving any messages that might
be
+ * left in the topic after a failure and shutting down the broker properly.
+ * This is quite important for subsequent test cases that assume the topic
+ * to be empty.
+ * </p>
+ */
+ protected void tearDown() throws Exception {
+ TextMessage msg;
+ for (final Iterator i = consumersToEmpty.iterator(); i.hasNext();)
+ {
+ final MessageConsumer consumer = (MessageConsumer) i.next();
+ if (consumer != null)
+ do
+ msg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+ while (msg != null);
+ }
+ if (connection != null) {
+ connection.stop();
+ }
+ broker.stop();
+ }
+
+ protected void registerToBeEmptiedOnShutdown(final MessageConsumer
consumer)
+ {
+ consumersToEmpty.add(consumer);
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/ActiveMQTestCase.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java?rev=581053&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
Mon Oct 1 13:02:18 2007
@@ -0,0 +1,230 @@
+/* ====================================================================
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================== */
+
+package org.apache.activemq.bugs.amq1095;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+
+/**
+ * <p>
+ * Test cases for various ActiveMQ functionalities.
+ * </p>
+ *
+ * <ul>
+ * <li>
+ * <p>
+ * Durable subscriptions are used.
+ * </p>
+ * </li>
+ * <li>
+ * <p>
+ * The Kaha persistence manager is used.
+ * </p>
+ * </li>
+ * <li>
+ * <p>
+ * An already existing Kaha directory is used. Everything runs fine if the
+ * ActiveMQ broker creates a new Kaha directory.
+ * </p>
+ * </li>
+ * </ul>
+ *
+ * @author Rainer Klute <a
+ * href="mailto:[EMAIL PROTECTED]"><[EMAIL PROTECTED]></a>
+ * @since 2007-08-09
+ * @version $Id: MessageSelectorTest.java 12 2007-08-14 12:02:02Z rke $
+ */
+public class MessageSelectorTest extends ActiveMQTestCase {
+
+ private MessageConsumer consumer1;
+ private MessageConsumer consumer2;
+
+ /** <p>Constructor</p> */
+ public MessageSelectorTest()
+ {}
+
+ /** <p>Constructor</p>
+ * @param name the test case's name
+ */
+ public MessageSelectorTest(final String name)
+ {
+ super(name);
+ }
+
+ /**
+ * <p>
+ * Tests whether message selectors work for durable subscribers.
+ * </p>
+ */
+ public void testMessageSelectorForDurableSubscribersRunA()
+ {
+ runMessageSelectorTest(true);
+ }
+
+ /**
+ * <p>
+ * Tests whether message selectors work for durable subscribers.
+ * </p>
+ */
+ public void testMessageSelectorForDurableSubscribersRunB()
+ {
+ runMessageSelectorTest(true);
+ }
+
+ /**
+ * <p>
+ * Tests whether message selectors work for non-durable subscribers.
+ * </p>
+ */
+ public void testMessageSelectorForNonDurableSubscribers()
+ {
+ runMessageSelectorTest(false);
+ }
+
+ /**
+ * <p>
+ * Tests whether message selectors work. This is done by sending two
+ * messages to a topic. Both have an int property with different values.
Two
+ * subscribers use message selectors to receive the messages. Each one
+ * should receive exactly one of the messages.
+ * </p>
+ */
+ private void runMessageSelectorTest(final boolean isDurableSubscriber)
+ {
+ try
+ {
+ final String PROPERTY_CONSUMER = "consumer";
+ final String CONSUMER_1 = "Consumer 1";
+ final String CONSUMER_2 = "Consumer 2";
+ final String MESSAGE_1 = "Message to " + CONSUMER_1;
+ final String MESSAGE_2 = "Message to " + CONSUMER_2;
+
+ assertNotNull(connection);
+ assertNotNull(destination);
+
+ final Session producingSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
producingSession.createProducer(destination);
+
+ final Session consumingSession1 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Session consumingSession2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ if (isDurableSubscriber)
+ {
+ consumer1 = consumingSession1.createDurableSubscriber
+ ((Topic) destination, CONSUMER_1, PROPERTY_CONSUMER + " =
1", false);
+ consumer2 = consumingSession2.createDurableSubscriber
+ ((Topic) destination, CONSUMER_2, PROPERTY_CONSUMER + " =
2", false);
+ }
+ else
+ {
+ consumer1 = consumingSession1.createConsumer(destination,
PROPERTY_CONSUMER + " = 1");
+ consumer2 = consumingSession2.createConsumer(destination,
PROPERTY_CONSUMER + " = 2");
+ }
+ registerToBeEmptiedOnShutdown(consumer1);
+ registerToBeEmptiedOnShutdown(consumer2);
+
+ connection.start();
+
+ TextMessage msg1;
+ TextMessage msg2;
+ int propertyValue;
+ String contents;
+
+ /* Try to receive any messages from the consumers. There shouldn't
be any yet. */
+ msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
+ if (msg1 != null)
+ {
+ final StringBuffer msg = new StringBuffer("The consumer read a
message that was left over from a former ActiveMQ broker run.");
+ propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER);
+ contents = msg1.getText();
+ if (propertyValue != 1) // Is the property value as expected?
+ {
+ msg.append(" That message does not match the consumer's
message selector.");
+ fail(msg.toString());
+ }
+ assertEquals(1, propertyValue);
+ assertEquals(MESSAGE_1, contents);
+ }
+ msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
+ if (msg2 != null)
+ {
+ final StringBuffer msg = new StringBuffer("The consumer read a
message that was left over from a former ActiveMQ broker run.");
+ propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER);
+ contents = msg2.getText();
+ if (propertyValue != 2) // Is the property value as expected?
+ {
+ msg.append(" That message does not match the consumer's
message selector.");
+ fail(msg.toString());
+ }
+ assertEquals(2, propertyValue);
+ assertEquals(MESSAGE_2, contents);
+ }
+
+ /* Send two messages. Each is targeted at one of the consumers. */
+ TextMessage msg;
+ msg = producingSession.createTextMessage();
+ msg.setText(MESSAGE_1);
+ msg.setIntProperty(PROPERTY_CONSUMER, 1);
+ producer.send(msg);
+
+ msg = producingSession.createTextMessage();
+ msg.setText(MESSAGE_2);
+ msg.setIntProperty(PROPERTY_CONSUMER, 2);
+ producer.send(msg);
+
+ /* Receive the messages that have just been sent. */
+
+ /* Use consumer 1 to receive one of the messages. The receive()
+ * method is called twice to make sure there is nothing else in
+ * stock for this consumer. */
+ msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
+ assertNotNull(msg1);
+ propertyValue = msg1.getIntProperty(PROPERTY_CONSUMER);
+ contents = msg1.getText();
+ assertEquals(1, propertyValue);
+ assertEquals(MESSAGE_1, contents);
+ msg1 = (TextMessage) consumer1.receive(RECEIVE_TIMEOUT);
+ assertNull(msg1);
+
+ /* Use consumer 2 to receive the other message. The receive()
+ * method is called twice to make sure there is nothing else in
+ * stock for this consumer. */
+ msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
+ assertNotNull(msg2);
+ propertyValue = msg2.getIntProperty(PROPERTY_CONSUMER);
+ contents = msg2.getText();
+ assertEquals(2, propertyValue);
+ assertEquals(MESSAGE_2, contents);
+ msg2 = (TextMessage) consumer2.receive(RECEIVE_TIMEOUT);
+ assertNull(msg2);
+ }
+ catch (JMSException ex)
+ {
+ ex.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1095/MessageSelectorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml?rev=581053&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
(added)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
Mon Oct 1 13:02:18 2007
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans>
+
+ <broker brokerName="localhost" xmlns="http://activemq.org/config/1.0"
persistent="true">
+
+ <persistenceAdapter>
+ <kahaPersistenceAdapter directory="file:kahadir"
maxDataFileLength="200000"/>
+ </persistenceAdapter>
+
+ <destinations>
+ <queue physicalName="unused"/>
+ <topic physicalName="activemq.TestTopic"/>
+ </destinations>
+
+ </broker>
+
+</beans>
\ No newline at end of file
Propchange:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/bugs/amq1095/activemq.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml