This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 4c34a49132 Prevent cursor from using more than 100% temp store (#2165)
4c34a49132 is described below
commit 4c34a49132de68e8427ca011fbe8bccb218795e1
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jul 2 10:14:15 2026 -0400
Prevent cursor from using more than 100% temp store (#2165)
This fixes a bug in FilePendingMessageCursor that could cause the
temporary store to exceed 100% usage. The cursor now properly checks the
temporary usage limits when writing messages from memory to the
temporary store. If the store is full, messages are now discarded and
a discarded advisory will be fired (if enabled) and the messages will be
DLQ'd if the processNonPersistent flag is true. This commit also fixes
an issue with Topic subs possibly sending expired messages to the DLQ
twice which was noticed while working on this fix.
---
.../activemq/broker/region/BaseDestination.java | 7 +
.../activemq/broker/region/TopicSubscription.java | 12 +-
.../region/cursors/FilePendingMessageCursor.java | 225 ++++++++++++++-------
.../cursors/StoreDurableSubscriberCursor.java | 3 +-
...ndingDurableSubscriberMessageStoragePolicy.java | 2 +-
.../FilePendingSubscriberMessageStoragePolicy.java | 2 +-
.../apache/activemq/advisory/AdvisoryTests.java | 15 +-
.../FilePendingMessageCursorTestSupport.java | 208 ++++++++++++++++++-
8 files changed, 390 insertions(+), 84 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index e98156fa48..3588902493 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -31,6 +31,7 @@ import
org.apache.activemq.broker.region.policy.MessageInterceptorStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
@@ -578,6 +579,12 @@ public abstract class BaseDestination implements
Destination {
if (advisoryForDiscardingMessages) {
broker.messageDiscarded(context, sub, messageReference);
}
+ // We need to send to the DLQ because broker.messageDiscarded() will
not do that because it's
+ // optionally enabled and off by default. This is different than
expiration handling because
+ // broker.messageExpired() does send to the DLQ
+ final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
+ final String poisonCause = info != null ? "Subscription discard. ID:"
+ info.getConsumerId() : "Message discarded";
+ broker.sendToDeadLetterQueue(context, messageReference, sub, new
Throwable(poisonCause));
}
/**
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 62659a6d5b..733476b758 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -82,7 +82,7 @@ public class TopicSubscription extends AbstractSubscription {
if (info.getDestination().isTemporary() ||
broker.getTempDataStore()==null ) {
this.matched = new VMPendingMessageCursor(false);
} else {
- this.matched = new
FilePendingMessageCursor(broker,matchedName,false);
+ this.matched = new
FilePendingMessageCursor(broker,matchedName,false, this);
}
this.scheduler = broker.getScheduler();
@@ -759,19 +759,25 @@ public class TopicSubscription extends
AbstractSubscription {
destination.getDestinationStatistics().getNetworkDequeues().increment();
}
}
- Destination dest = (Destination) message.getRegionDestination();
+
+ final Destination dest = (Destination)
message.getRegionDestination();
+ // This should not be null
if (dest != null) {
//If discard is due to expiration then use the
messageExpired() callback
if (expired) {
LOG.debug("{}, expiring message {}", this, message);
+ // This callback will also send messages to the DLQ
dest.messageExpired(getContext(), this, message);
} else {
LOG.debug("{}, discarding message {}", this, message);
discarded.incrementAndGet();
+ // This callback will also send messages to the DLQ
dest.messageDiscarded(getContext(), this, message);
}
+ } else {
+ LOG.debug("Message {} regionDestination unexpectedly unset
during discard on {}",
+ message.getMessageId(), info.getConsumerId());
}
- broker.getRoot().sendToDeadLetterQueue(getContext(), message,
this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
} finally {
discarding = false;
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 9bb93fbe79..f413d598d5 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -17,10 +17,12 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,6 +32,7 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Message;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.PList;
@@ -54,9 +57,10 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
private static final AtomicLong NAME_COUNT = new AtomicLong();
protected Broker broker;
+ private final Subscription sub;
private final PListStore store;
private final String name;
- private PendingList memoryList;
+ private final PendingList memoryList;
private PList diskList;
private Iterator<MessageReference> iter;
private Destination regionDestination;
@@ -65,12 +69,16 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
private final AtomicBoolean started = new AtomicBoolean();
private final WireFormat wireFormat = new OpenWireFormat();
+ enum DiscardType {
+ DISCARD, EXPIRED
+ }
+
/**
* @param broker
* @param name
* @param prioritizedMessages
*/
- public FilePendingMessageCursor(Broker broker, String name, boolean
prioritizedMessages) {
+ public FilePendingMessageCursor(Broker broker, String name, boolean
prioritizedMessages, Subscription sub) {
super(prioritizedMessages);
if (this.prioritizedMessages) {
this.memoryList = new PrioritizedPendingList();
@@ -78,12 +86,17 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
this.memoryList = new OrderedPendingList();
}
this.broker = broker;
+ this.sub = sub;
// the store can be null if the BrokerService has persistence
// turned off
this.store = broker.getTempDataStore();
this.name = NAME_COUNT.incrementAndGet() + "_" + name;
}
+ public FilePendingMessageCursor(Broker broker, String name, boolean
prioritizedMessages) {
+ this(broker, name, prioritizedMessages, null);
+ }
+
@Override
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
@@ -145,18 +158,25 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
@Override
public synchronized void release() {
- iterating = false;
- if (iter instanceof DiskIterator) {
- ((DiskIterator)iter).release();
- };
- if (flushRequired) {
- flushRequired = false;
- if (!hasSpace()) {
- flushToDisk();
+ Map<MessageReference, DiscardType> discardMap = null;
+
+ synchronized (this) {
+ iterating = false;
+ if (iter instanceof DiskIterator) {
+ ((DiskIterator)iter).release();
+ };
+ if (flushRequired) {
+ flushRequired = false;
+ if (!hasSpace()) {
+ discardMap = flushToDisk();
+ }
}
+ // ensure any memory ref is released
+ iter = null;
}
- // ensure any memory ref is released
- iter = null;
+
+ // process outside of the lock
+ discardMessages(discardMap);
}
@Override
@@ -207,17 +227,17 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
*/
@Override
public boolean tryAddMessageLast(MessageReference node, long maxWaitTime)
throws Exception {
- // Discarding expired message should be done outside of synchronized
section (deadlock, see AMQ-5785)
- final List<MessageReference> expiredMessages = new ArrayList<>();
- final boolean added = tryAddMessageLastInternal(node, maxWaitTime,
expiredMessages);
- for (MessageReference expiredMessage : expiredMessages) {
- discardExpiredMessage(expiredMessage);
- }
+ // Discarding expired message should be done outside the synchronized
section (deadlock, see AMQ-5785)
+ final Map<MessageReference, DiscardType> discardMap = new
LinkedHashMap<>();
+ final boolean added = tryAddMessageLastInternal(node, maxWaitTime,
discardMap);
+ discardMessages(discardMap);
return added;
}
private synchronized boolean tryAddMessageLastInternal(MessageReference
node, long maxWaitTime,
-
List<MessageReference> expiredMessages) {
+
Map<MessageReference, DiscardType> discardMap) {
+ // ensure this is not null by mistake
+ Objects.requireNonNull(discardMap);
if (!node.isExpired()) {
try {
regionDestination = (Destination)
node.getMessage().getRegionDestination();
@@ -231,13 +251,13 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
- expireOldMessages(expiredMessages);
+ expireOldMessages(discardMap);
if (hasSpace()) {
memoryList.addMessageLast(node);
node.incrementReferenceCount();
return true;
} else {
- flushToDisk();
+ flushToDisk(discardMap);
}
}
}
@@ -253,10 +273,9 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
throw new RuntimeException(e);
}
} else {
- expiredMessages.add(node);
+ discardMap.put(node, DiscardType.EXPIRED);
+ return true;
}
- //message expired
- return true;
}
/**
@@ -266,16 +285,13 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
*/
@Override
public void addMessageFirst(MessageReference node) {
- // Discarding expired message should be done outside of synchronized
section (deadlock, see AMQ-5785)
- final List<MessageReference> expiredMessages =
addMessageFirstInternal(node);
- if (expiredMessages != null) {
- for (MessageReference expiredMessage : expiredMessages) {
- discardExpiredMessage(expiredMessage);
- }
- }
+ // Discarding expired message should be done outside the synchronized
section (deadlock, see AMQ-5785)
+ final Map<MessageReference, DiscardType> discardedMessages =
addMessageFirstInternal(node);
+ discardMessages(discardedMessages);
}
- private synchronized List<MessageReference>
addMessageFirstInternal(MessageReference node) {
+ private synchronized Map<MessageReference, DiscardType>
addMessageFirstInternal(MessageReference node) {
+ Map<MessageReference, DiscardType> discardMap = null;
if (!node.isExpired()) {
try {
regionDestination = (Destination)
node.getMessage().getRegionDestination();
@@ -284,18 +300,18 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
memoryList.addMessageFirst(node);
node.incrementReferenceCount();
setCacheEnabled(true);
- return List.of();
+ return null;
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
- List<MessageReference> expiredMessages =
expireOldMessages();
+ discardMap = expireOldMessages();
if (hasSpace()) {
memoryList.addMessageFirst(node);
node.incrementReferenceCount();
- return expiredMessages;
+ return discardMap;
} else {
- flushToDisk();
+ discardMap = flushToDisk(discardMap);
}
}
}
@@ -304,15 +320,22 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
ByteSequence bs = getByteSequence(node.getMessage());
Object locator =
getDiskList().addFirst(node.getMessageId().toString(), bs);
node.getMessageId().setPlistLocator(locator);
-
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: {} first to
FilePendingMessageCursor ", node, e);
throw new RuntimeException(e);
}
} else {
- return List.of(node);
+ return Map.of(node, DiscardType.EXPIRED);
+ }
+ return discardMap;
+ }
+
+ private void discardMessages(Map<MessageReference, DiscardType>
discardMap) {
+ if (discardMap != null) {
+ for (Entry<MessageReference, DiscardType> discardedMessage :
discardMap.entrySet()) {
+ discardMessage(discardedMessage.getKey(),
discardedMessage.getValue());
+ }
}
- return null;
}
/**
@@ -423,24 +446,22 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int
newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
- List<MessageReference> expiredMessages = null;
- synchronized (this) {
- if (!flushRequired && size() != 0) {
- flushRequired =true;
- if (!iterating) {
- expiredMessages = expireOldMessages();
- if (!hasSpace()) {
- flushToDisk();
- flushRequired = false;
+ Map<MessageReference, DiscardType> discardMap = null;
+ try {
+ synchronized (this) {
+ if (!flushRequired && size() != 0) {
+ flushRequired = true;
+ if (!iterating) {
+ discardMap = expireOldMessages();
+ if (!hasSpace()) {
+ discardMap = flushToDisk(discardMap);
+ flushRequired = false;
+ }
}
}
}
- }
-
- if (expiredMessages != null) {
- for (MessageReference node : expiredMessages) {
- discardExpiredMessage(node);
- }
+ } finally {
+ discardMessages(discardMap);
}
}
}
@@ -450,26 +471,34 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
return true;
}
- private synchronized List<MessageReference> expireOldMessages() {
- final List<MessageReference> expired = new ArrayList<>();
- expireOldMessages(expired);
- return expired;
+ private synchronized Map<MessageReference, DiscardType>
expireOldMessages() {
+ return expireOldMessages(null);
}
- private synchronized void expireOldMessages(List<MessageReference>
expired) {
+ private synchronized Map<MessageReference, DiscardType>
expireOldMessages(Map<MessageReference, DiscardType> discardMap) {
if (!memoryList.isEmpty()) {
for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
MessageReference node = iterator.next();
if (node.isExpired()) {
- node.decrementReferenceCount();
- expired.add(node);
+ if (discardMap == null) {
+ discardMap = new LinkedHashMap<>();
+ }
+ // do not decrement ref count yet - we are removing from
the memoryList
+ // but keeping in memory for now by adding to the
discardMap. Decrement
+ // the count after processing the expiration
+ discardMap.put(node, DiscardType.EXPIRED);
iterator.remove();
}
}
}
+ return discardMap;
+ }
+
+ private synchronized Map<MessageReference, DiscardType> flushToDisk() {
+ return flushToDisk(null);
}
- protected synchronized void flushToDisk() {
+ private synchronized Map<MessageReference, DiscardType>
flushToDisk(Map<MessageReference, DiscardType> discardMap) {
if (!memoryList.isEmpty() && store != null) {
long start = 0;
if (LOG.isTraceEnabled()) {
@@ -480,24 +509,40 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
}
for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
MessageReference node = iterator.next();
- node.decrementReferenceCount();
- ByteSequence bs;
try {
- bs = getByteSequence(node.getMessage());
- getDiskList().addLast(node.getMessageId().toString(), bs);
+ // only write to disk if we have space, else we just have
to drop the
+ // messages as they won't fit on disk and this cursor does
not support
+ // mixing messages in memory and on disk. These are
non-persistent messages
+ // so dropping them is ok if there's no space.
+ if (!getSystemUsage().getTempUsage().isFull()) {
+ ByteSequence bs = getByteSequence(node.getMessage());
+ getDiskList().addLast(node.getMessageId().toString(),
bs);
+ // decrement if not discarded as node will be removed
from memory
+ node.decrementReferenceCount();
+ } else {
+ if (discardMap == null) {
+ discardMap = new LinkedHashMap<>();
+ }
+ discardMap.put(node, DiscardType.DISCARD);
+ // Don't decrement reference count yet as we still
have the node
+ // in memory until discard loop is processed because
writing to
+ // disk could take a while
+ }
+ // Remove the node as soon as we write to disk
+ // This will ensure we keep the memory usage tracking
correct
+ iterator.remove();
} catch (IOException e) {
LOG.error("Failed to write to disk list", e);
throw new RuntimeException(e);
}
-
}
- memoryList.clear();
setCacheEnabled(false);
LOG.trace("{}, flushToDisk() done - {} ms {}",
name,
(System.currentTimeMillis() - start),
(systemUsage != null ? systemUsage.getMemoryUsage() : ""));
}
+ return discardMap;
}
protected boolean isDiskListEmpty() {
@@ -516,12 +561,44 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
return diskList;
}
+ private void discardMessage(MessageReference reference, DiscardType
discardType) {
+ if (discardType == DiscardType.EXPIRED) {
+ discardExpiredMessage(reference);
+ } else {
+ discardMessage(reference);
+ }
+ }
+
private void discardExpiredMessage(MessageReference reference) {
LOG.debug("Discarding expired message {}", reference);
- if (reference.isExpired() && broker.isExpired(reference)) {
- ConnectionContext context = new ConnectionContext();
- context.setBroker(broker);
-
((Destination)reference.getRegionDestination()).messageExpired(context, null,
new IndirectMessageReference(reference.getMessage()));
+ try {
+ if (reference.isExpired() && broker.isExpired(reference)) {
+ ConnectionContext context = broker.getAdminConnectionContext();
+ ((Destination)
reference.getRegionDestination()).messageExpired(context, sub,
+ new IndirectMessageReference(reference.getMessage()));
+ }
+ } catch (Exception e) {
+ LOG.warn("Error discarding expired message {}",
reference.getMessageId());
+ LOG.debug(e.getMessage(), e);
+ } finally {
+ // we can now drop the reference count
+ reference.decrementReferenceCount();
+ }
+ }
+
+ private void discardMessage(MessageReference reference) {
+ try {
+ LOG.debug("Discarding message {} due to full temporary storage
{}", reference, systemUsage.getTempUsage());
+ ConnectionContext context = broker.getAdminConnectionContext();
+ // This will handle advisories if advisoryForDiscardingMessages is
enabled
+ // as well as DLQ processing
+
((Destination)reference.getRegionDestination()).messageDiscarded(context, sub,
new IndirectMessageReference(reference.getMessage()));
+ } catch (Exception e) {
+ LOG.warn("Error discarding message {}", reference.getMessageId());
+ LOG.debug(e.getMessage(), e);
+ } finally {
+ // we can now drop the reference count
+ reference.decrementReferenceCount();
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 510412b794..773848a5c6 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -64,7 +64,8 @@ public class StoreDurableSubscriberCursor extends
AbstractPendingMessageCursor {
this.clientId = clientId;
this.subscriberName = subscriberName;
if (broker.getBrokerService().isPersistent()) {
- this.nonPersistent = new FilePendingMessageCursor(broker,clientId
+ subscriberName,this.prioritizedMessages);
+ this.nonPersistent = new FilePendingMessageCursor(broker,clientId
+ subscriberName,this.prioritizedMessages,
+ subscription);
} else {
this.nonPersistent = new
VMPendingMessageCursor(this.prioritizedMessages);
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
index 0f714eb63c..5e8d7ed753 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
@@ -44,6 +44,6 @@ public class FilePendingDurableSubscriberMessageStoragePolicy
implements Pending
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker
broker,String clientId, String name, int maxBatchSize, DurableTopicSubscription
sub) {
- return new
FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,
sub));
+ return new
FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,
sub), sub);
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
index 2a42d0e64f..01cbcec42d 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
@@ -43,6 +43,6 @@ public class FilePendingSubscriberMessageStoragePolicy
implements PendingSubscri
public PendingMessageCursor getSubscriberPendingMessageCursor(Broker
broker, String name, int maxBatchSize,
Subscription subs) {
return new FilePendingMessageCursor(broker, "PendingCursor:" + name,
AbstractPendingMessageCursor
- .isPrioritizedMessageSubscriber(broker, subs));
+ .isPrioritizedMessageSubscriber(broker, subs), subs);
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 1bfc74cdd9..d11d410605 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -432,6 +432,8 @@ public class AdvisoryTests {
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+ MessageConsumer dlqConsumer =
s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic(dest));
+
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
producer.setTimeToLive(ttl);
@@ -453,6 +455,9 @@ public class AdvisoryTests {
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+ // should have also gotten advisory for the DLQ
+ assertNotNull(dlqConsumer.receive(1000));
+
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
@@ -471,6 +476,7 @@ public class AdvisoryTests {
MessageConsumer consumer = s.createConsumer(dest);
MessageConsumer expiredAdvisoryConsumer =
s.createConsumer(AdvisorySupport.getExpiredMessageTopic(dest));
MessageConsumer discardedAdvisoryConsumer =
s.createConsumer(AdvisorySupport.getMessageDiscardedAdvisoryTopic(dest));
+ MessageConsumer dlqConsumer =
s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic(dest));
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
@@ -498,6 +504,9 @@ public class AdvisoryTests {
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+ // should have also gotten advisory for the DLQ
+ assertNotNull(dlqConsumer.receive(1000));
+
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
@@ -533,6 +542,7 @@ public class AdvisoryTests {
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+ MessageConsumer dlqConsumer =
s.createConsumer(AdvisorySupport.getMessageDLQdAdvisoryTopic(dest));
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(dest);
producer.setTimeToLive(ttl);
@@ -552,6 +562,9 @@ public class AdvisoryTests {
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+ // should have also gotten advisory for the DLQ
+ assertNotNull(dlqConsumer.receive(1000));
+
//This should be set
assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
@@ -561,7 +574,7 @@ public class AdvisoryTests {
}
@Test(timeout = 60000)
- public void testMessageDLQd() throws Exception {
+ public void testMessageDLQdOnDiscard() throws Exception {
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setTopicPrefetch(2);
((ActiveMQConnection) connection).setPrefetchPolicy(policy);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
index 57f3a86087..23d769b7f2 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
@@ -16,25 +16,51 @@
*/
package org.apache.activemq.broker.region.cursors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.IndirectMessageReference;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.PList;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.ByteSequence;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class FilePendingMessageCursorTestSupport {
+public abstract class FilePendingMessageCursorTestSupport {
protected static final Logger LOG =
LoggerFactory.getLogger(FilePendingMessageCursorTestSupport.class);
protected BrokerService brokerService;
- protected FilePendingMessageCursor underTest;
+ protected FilePendingMessageCursor underTest;
+ private final AtomicInteger discarded = new AtomicInteger();
+ private final AtomicInteger dlq = new AtomicInteger();
+
+ @Before
+ public void setUp() throws Exception {
+ discarded.set(0);
+ dlq.set(0);
+ }
@After
public void stopBroker() throws Exception {
@@ -44,10 +70,31 @@ public class FilePendingMessageCursorTestSupport {
}
private void createBrokerWithTempStoreLimit() throws Exception {
+ createBrokerWithTempStoreLimit(1025*1024*15);
+ }
+
+ private void createBrokerWithTempStoreLimit(long limit) throws Exception {
brokerService = new BrokerService();
brokerService.setUseJmx(false);
SystemUsage usage = brokerService.getSystemUsage();
- usage.getTempUsage().setLimit(1025*1024*15);
+ usage.getTempUsage().setLimit(limit);
+ brokerService.setPlugins(new BrokerPlugin[] { new
BrokerPluginSupport() {
+ @Override
+ public void messageDiscarded(ConnectionContext context,
Subscription sub,
+ MessageReference messageReference) {
+ super.messageDiscarded(context, sub, messageReference);
+ discarded.incrementAndGet();
+ }
+
+ @Override
+ public boolean sendToDeadLetterQueue(ConnectionContext context,
+ MessageReference messageReference, Subscription
subscription,
+ Throwable poisonCause) {
+ dlq.incrementAndGet();
+ return super.sendToDeadLetterQueue(context, messageReference,
subscription,
+ poisonCause);
+ }
+ }});
brokerService.start();
// put something in the temp store to on demand initialise it
@@ -96,4 +143,159 @@ public class FilePendingMessageCursorTestSupport {
} catch (NullPointerException expected) {}
}
+ // This test will verify that when flushing message in memory to the
+ // temp store that the store will check usage and not keep writing when
full
+ // If full, remaining messages will just get dropped
+ @Test(timeout=30000)
+ public void testFlushToDiskWhenTempStoreIsFull() throws Exception {
+ createBrokerWithTempStoreLimit(1024 * 1024);
+ SystemUsage usage = brokerService.getSystemUsage();
+ usage.getMemoryUsage().setLimit(1024 * 100);
+ Queue destination = new Queue(brokerService, new ActiveMQQueue("Q"),
null, new DestinationStatistics(), null);
+ destination.setAdvisoryForDiscardingMessages(true);
+ PList diskList = brokerService.getTempDataStore().getPList("temp");
+
+ // fill the temp store
+ int id = 0;
+ ByteSequence payload = new ByteSequence(new byte[1024]);
+ while (!usage.getTempUsage().isFull()) {
+ diskList.addLast("A-" + (++id), payload);
+ }
+
+ // Ensure that the temp store is full and get current usage
+ TempUsage tempUsage = usage.getTempUsage();
+ long existingUsage = tempUsage.getUsage();
+ assertTrue("temp store is full: %" + tempUsage.getPercentUsage(),
tempUsage.isFull());
+
+ underTest = new FilePendingMessageCursor(brokerService.getBroker(),
"test", false);
+ underTest.setSystemUsage(usage);
+
+ // At this point the memory cursor is empty
+ // This will write messages until it fills and then the cursor will try
+ // and dump to disk. All messages will just get dropped because
there's no space.
+ int i = 0;
+ ActiveMQMessage mqMessage;
+ do {
+ mqMessage = new ActiveMQMessage();
+ mqMessage.setMessageId(new MessageId("1:2:3:" + i++));
+ mqMessage.setContent(new ByteSequence(new byte[1024]));
+ mqMessage.setRegionDestination(destination);
+ mqMessage.setMemoryUsage(usage.getMemoryUsage());
+ // This should timeout and return false when trying to write to disk
because
+ // the cursor now checks if temp is full before trying to write
+ } while(underTest.tryAddMessageLast(new
IndirectMessageReference(mqMessage), 10));
+
+ // Verify that the usage didn't increase, and no messages were written
to
+ // temp because it was already full
+ assertEquals(existingUsage, tempUsage.getUsage());
+ assertEquals(0, underTest.getDiskList().size());
+ // memory usage should be 0 because all the messages get dropped
+ assertEquals(0, usage.getMemoryUsage().getUsage());
+ // the discard callback should have been called for all the messages
that
+ // were removed from the memory store because temp was full. This is
+ // i - 1 because the last message failed to process and timed out so it
+ // was not already in the memory store and was not discarded.
+ assertEquals(i - 1, discarded.get());
+
+ // all messages should have been sent for possible DLQ processing
+ // the messages are non-persistent so the callback would generally
skip sending
+ // them to the DLQ because processNonPersistent is usually false, but
this verifies
+ // the callback is at least called in case someone has enabled the flag
+ assertEquals(discarded.get(), dlq.get());
+ }
+
+ @Test(timeout=30000)
+ public void testFlushToDiskWhenTempStoreIsHalfFull() throws Exception {
+ createBrokerWithTempStoreLimit(1024 * 512);
+ SystemUsage usage = brokerService.getSystemUsage();
+ // set a memory usage higher than on disk limit for testing
+ usage.getMemoryUsage().setLimit(1024 * 1024);
+ Queue destination = new Queue(brokerService, new ActiveMQQueue("Q"),
null, new DestinationStatistics(), null);
+ destination.setAdvisoryForDiscardingMessages(true);
+ PList diskList = brokerService.getTempDataStore().getPList("temp");
+
+ // fill the temp store halfway
+ int id = 0;
+ ByteSequence payload = new ByteSequence(new byte[1024]);
+ while (!usage.getTempUsage().isFull(50)) {
+ diskList.addLast("A-" + (++id), payload);
+ }
+
+ // Ensure that the temp store is half full and get current usage
+ TempUsage tempUsage = usage.getTempUsage();
+ assertTrue("temp store is full: %" + tempUsage.getPercentUsage(),
tempUsage.isFull(50));
+
+ underTest = new FilePendingMessageCursor(brokerService.getBroker(),
"test", false);
+ underTest.setSystemUsage(usage);
+
+ // Start adding messages
+ int i = 0;
+ ActiveMQMessage mqMessage;
+ do {
+ mqMessage = new ActiveMQMessage();
+ mqMessage.setMessageId(new MessageId("1:2:3:" + i++));
+ mqMessage.setContent(new ByteSequence(new byte[1024]));
+ mqMessage.setMemoryUsage(usage.getMemoryUsage());
+ mqMessage.setRegionDestination(destination);
+ // This should timeout and return false when trying to write to
disk when Temp is full
+ // and will cause the loop to stop
+ } while(underTest.tryAddMessageLast(new
IndirectMessageReference(mqMessage), 10));
+
+ // This should be false, there was not enough space for all the
messages in memory but for some
+ // so the dumping from memory to disk should have written part of the
messages
+ assertFalse(underTest.getDiskList().isEmpty());
+ // we should have discarded the remaining from memory
+ assertTrue(discarded.get() > 0);
+ // This is i - 1 because the last message failed to add on timeout.
+ // This verifies that the total messages added equals the number on
disk + discarded
+ // because any excess now get discarded when temp store is full
+ assertEquals(i - 1, underTest.getDiskList().size() + discarded.get());
+ // memory usage should be 0 because all the messages get moved from
memory
+ assertEquals(0, usage.getMemoryUsage().getUsage());
+ // discarded should equal possible DLQ
+ assertEquals(discarded.get(), dlq.get());
+ }
+
+ @Test(timeout=30000)
+ public void testFlushToDiskWhenTempStoreHasSpace() throws Exception {
+ // create with plenty of space
+ createBrokerWithTempStoreLimit(10 * 1024 * 1024);
+ SystemUsage usage = brokerService.getSystemUsage();
+ // set a memory usage lower than disk limit
+ usage.getMemoryUsage().setLimit(1024 * 1024);
+ Queue destination = new Queue(brokerService, new ActiveMQQueue("Q"),
null, new DestinationStatistics(), null);
+ destination.setAdvisoryForDiscardingMessages(true);
+ PList diskList = brokerService.getTempDataStore().getPList("temp");
+
+ // fill the temp store only 10% which gives plenty of space for
messages later
+ int id = 0;
+ ByteSequence payload = new ByteSequence(new byte[1024]);
+ while (!usage.getTempUsage().isFull(10)) {
+ diskList.addLast("A-" + (++id), payload);
+ }
+ underTest = new FilePendingMessageCursor(brokerService.getBroker(),
"test", false);
+ underTest.setSystemUsage(usage);
+
+ // Start adding messages
+ int i = 0;
+ ActiveMQMessage mqMessage;
+ do {
+ mqMessage = new ActiveMQMessage();
+ mqMessage.setMessageId(new MessageId("1:2:3:" + i++));
+ mqMessage.setContent(new ByteSequence(new byte[1024]));
+ mqMessage.setMemoryUsage(usage.getMemoryUsage());
+ mqMessage.setRegionDestination(destination);
+ // Keep writing until disk list is no longer null which means we
now dumped to disk
+ // because memory is full
+ } while(underTest.tryAddMessageLast(new
IndirectMessageReference(mqMessage), 10)
+ && underTest.getDiskList() != null);
+
+ // We had space so every message sent should exist on disk
+ assertEquals(i - 1, underTest.getDiskList().size());
+ // discarded/dlq should be 0
+ assertEquals(0, discarded.get());
+ assertEquals(0, dlq.get());
+ // memory usage should be 0 because all the messages get moved from
memory
+ assertEquals(0, usage.getMemoryUsage().getUsage());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact