This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-6.2.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-6.2.x by this push:
new 2f6bde3c96 [6.2.x] Backport missing fixes for AMQ-9813 (#2174)
2f6bde3c96 is described below
commit 2f6bde3c960f0be79db6da379ce9aef34cccd49d
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jul 2 12:05:04 2026 -0400
[6.2.x] Backport missing fixes for AMQ-9813 (#2174)
* AMQ-9813 - fix wrong QueueSize for non-persistent message with TTL
- add "missing" invocation of discardExpiredMessage() method
into tryAddMessageLast(), addMessageFirst(), probably caused
in context of AMQ-5785
- use same "postponed" strategy (outside of synchronization) like was
already done in original commit (see onUsageChanged() method)
(cherry picked from commit 5c5eb335eefc2e0422b17c512eec7796f8ab08df)
* AMQ-9813 - Minor updates
Rework handling of expiration array list to avoid allocation when not
needed
(cherry picked from commit 602f1bc75cf961ae355f87742413621888f16a93)
---------
Co-authored-by: Radek Kraus <[email protected]>
---
.../region/cursors/FilePendingMessageCursor.java | 47 ++-
...FullDestinationMemoryMessageExpirationTest.java | 399 +++++++++++++++++++++
2 files changed, 435 insertions(+), 11 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index ed51ce8881..9bb93fbe79 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -206,7 +206,18 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
* @throws Exception
*/
@Override
- public synchronized boolean tryAddMessageLast(MessageReference node, long
maxWaitTime) throws Exception {
+ public boolean tryAddMessageLast(MessageReference node, long maxWaitTime)
throws Exception {
+ // Discarding expired message should be done outside of synchronized
section (deadlock, see AMQ-5785)
+ final List<MessageReference> expiredMessages = new ArrayList<>();
+ final boolean added = tryAddMessageLastInternal(node, maxWaitTime,
expiredMessages);
+ for (MessageReference expiredMessage : expiredMessages) {
+ discardExpiredMessage(expiredMessage);
+ }
+ return added;
+ }
+
+ private synchronized boolean tryAddMessageLastInternal(MessageReference
node, long maxWaitTime,
+
List<MessageReference> expiredMessages) {
if (!node.isExpired()) {
try {
regionDestination = (Destination)
node.getMessage().getRegionDestination();
@@ -220,7 +231,7 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
- expireOldMessages();
+ expireOldMessages(expiredMessages);
if (hasSpace()) {
memoryList.addMessageLast(node);
node.incrementReferenceCount();
@@ -242,7 +253,7 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
throw new RuntimeException(e);
}
} else {
- discardExpiredMessage(node);
+ expiredMessages.add(node);
}
//message expired
return true;
@@ -254,7 +265,17 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
* @param node
*/
@Override
- public synchronized void addMessageFirst(MessageReference node) {
+ public void addMessageFirst(MessageReference node) {
+ // Discarding expired message should be done outside of synchronized
section (deadlock, see AMQ-5785)
+ final List<MessageReference> expiredMessages =
addMessageFirstInternal(node);
+ if (expiredMessages != null) {
+ for (MessageReference expiredMessage : expiredMessages) {
+ discardExpiredMessage(expiredMessage);
+ }
+ }
+ }
+
+ private synchronized List<MessageReference>
addMessageFirstInternal(MessageReference node) {
if (!node.isExpired()) {
try {
regionDestination = (Destination)
node.getMessage().getRegionDestination();
@@ -263,16 +284,16 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
memoryList.addMessageFirst(node);
node.incrementReferenceCount();
setCacheEnabled(true);
- return;
+ return List.of();
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
- expireOldMessages();
+ List<MessageReference> expiredMessages =
expireOldMessages();
if (hasSpace()) {
memoryList.addMessageFirst(node);
node.incrementReferenceCount();
- return;
+ return expiredMessages;
} else {
flushToDisk();
}
@@ -289,8 +310,9 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
throw new RuntimeException(e);
}
} else {
- discardExpiredMessage(node);
+ return List.of(node);
}
+ return null;
}
/**
@@ -429,7 +451,12 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
}
private synchronized List<MessageReference> expireOldMessages() {
- List<MessageReference> expired = new ArrayList<MessageReference>();
+ final List<MessageReference> expired = new ArrayList<>();
+ expireOldMessages(expired);
+ return expired;
+ }
+
+ private synchronized void expireOldMessages(List<MessageReference>
expired) {
if (!memoryList.isEmpty()) {
for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
MessageReference node = iterator.next();
@@ -440,8 +467,6 @@ public class FilePendingMessageCursor extends
AbstractPendingMessageCursor imple
}
}
}
-
- return expired;
}
protected synchronized void flushToDisk() {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FullDestinationMemoryMessageExpirationTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FullDestinationMemoryMessageExpirationTest.java
new file mode 100644
index 0000000000..170bee4564
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FullDestinationMemoryMessageExpirationTest.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FullDestinationMemoryMessageExpirationTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(FullDestinationMemoryMessageExpirationTest.class);
+
+ private static final long DESTINATION_MEMORY_LIMIT = 1024 * 1024; // 1 MB
destination memory limit
+ private static final long BROKER_MEMORY_LIMIT = 64 *
DESTINATION_MEMORY_LIMIT; // Broker memory limit has to be bigger than
destination memory limit
+ private static final long BROKER_TEMP_USAGE_LIMIT = 64 *
DESTINATION_MEMORY_LIMIT;
+ private static final String BROKER_DATA_DIRECTORY = "target/test-classes/"
+
+
FullDestinationMemoryMessageExpirationTest.class.getName().replace('.', '/') +
+ "-activemq-data";
+ private static final String BROKER_URL = "vm://" +
BrokerService.DEFAULT_BROKER_NAME;
+ private static final String QUEUE_NAME = "NON_PERSISTENT_TEST";
+ private static final String MESSAGE_ID_PROPERTY_NAME = "MessageId";
+
+ private BrokerService brokerService;
+
+ @Before
+ public void setUp() throws Exception {
+ // Delete AMQ data directory
+ FileUtils.deleteDirectory(new File(BROKER_DATA_DIRECTORY));
+
+ // Configure/Start Broker
+ brokerService = new BrokerService();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setProducerFlowControl(false);
+ defaultEntry.setMemoryLimit(DESTINATION_MEMORY_LIMIT);
+ defaultEntry.setExpireMessagesPeriod(0); // Disable background
message expiration process
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(defaultEntry);
+ brokerService.setDestinationPolicy(policyMap);
+ brokerService.setUseJmx(false);
+ brokerService.setDataDirectory(BROKER_DATA_DIRECTORY);
+ KahaDBPersistenceAdapter persistenceAdapter = new
KahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(new
File(brokerService.getBrokerDataDirectory(),"KahaDB"));
+ brokerService.setPersistenceAdapter(persistenceAdapter);
+
brokerService.getSystemUsage().getTempUsage().setLimit(BROKER_TEMP_USAGE_LIMIT);
+
brokerService.getSystemUsage().getMemoryUsage().setLimit(BROKER_MEMORY_LIMIT);
+ brokerService.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ // Stop Broker
+ if(brokerService != null)
+ brokerService.stop();
+ } finally {
+ // Delete AMQ data directory
+ FileUtils.deleteDirectory(new File(BROKER_DATA_DIRECTORY));
+ }
+ }
+
+ @Test
+ public void destinationMemoryFullMessageExpirationTest() throws Exception {
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(BROKER_URL);
+
+ // Producer
+ final long messageTtl = 50L;
+ Producer producer = new Producer(connectionFactory, QUEUE_NAME,
messageTtl, 512);
+ Thread produderThread = new Thread(producer, "Producer");
+ produderThread.start();
+
+ // Wait till temporary storage is used
+ while(getTempPercentUsage(QUEUE_NAME) <= 0) {
+ Thread.sleep(20);
+ }
+
+ // Consumer
+ Consumer consumer = new Consumer(connectionFactory, QUEUE_NAME);
+ Thread consumerThread = new Thread(consumer, "Consumer");
+ consumerThread.start();
+
+ // Stop Producer after at least one message is received
+ while (consumer.getReceivedMessages().size() <= 0) {
+ Thread.sleep(20);
+ }
+ producer.stop();
+ produderThread.join();
+ assertNull(producer.getException());
+ LOG.info(String.format(
+ "Producer: SentMessageCount=%d, ExpirationCount=%d, QueueSize=%d
(MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%)",
+ producer.getSentMessages(),
+ getExpirationCount(QUEUE_NAME),
+ getQueueSize(QUEUE_NAME),
+ getMemoryPercentUsage(QUEUE_NAME),
+ getTempPercentUsage(QUEUE_NAME)
+ ));
+
+
+ // Wait till received messages + expiration messages = sentMessages
(or timeout expired)
+ final long sentMessagesCount = producer.getSentMessages();
+ final long deadlineMs = System.currentTimeMillis() + (60 * 1000);
+ long processedMessagesCount;
+ do {
+ processedMessagesCount = consumer.getReceivedMessages().size() +
getExpirationCount(QUEUE_NAME);
+ } while(processedMessagesCount < sentMessagesCount &&
System.currentTimeMillis() < deadlineMs);
+
+
+ // Stop Consumer
+ consumer.stop();
+ consumerThread.join();
+ assertNull(consumer.getException());
+ LOG.info(String.format(
+ "Consumer: ReceivedMessageCount=%d, ExpirationCount=%d,
QueueSize=%d (SentMessageCount=%d, MemoryPercentageUsage=%d%%,
TempPercentUsage=%d%%)",
+ consumer.getReceivedMessages().size(),
+ getExpirationCount(QUEUE_NAME),
+ getQueueSize(QUEUE_NAME),
+ producer.getSentMessages(),
+ getMemoryPercentUsage(QUEUE_NAME),
+ getTempPercentUsage(QUEUE_NAME)
+ ));
+ assertEquals(0, getQueueSize(QUEUE_NAME));
+ assertEquals(sentMessagesCount, processedMessagesCount);
+ }
+
+ private long getQueueSize(String queueName) throws Exception {
+ org.apache.activemq.broker.region.Destination destination =
brokerService.getDestination(
+ new ActiveMQQueue(queueName)
+ );
+ return destination.getDestinationStatistics().getMessages().getCount();
+ }
+
+ private long getExpirationCount(String queueName) throws Exception {
+ org.apache.activemq.broker.region.Destination destination =
brokerService.getDestination(
+ new ActiveMQQueue(queueName)
+ );
+ return destination.getDestinationStatistics().getExpired().getCount();
+ }
+
+ private int getMemoryPercentUsage(String queueName) throws Exception {
+ org.apache.activemq.broker.region.Destination destination =
brokerService.getDestination(
+ new ActiveMQQueue(queueName)
+ );
+ return destination.getMemoryUsage().getPercentUsage();
+ }
+
+ private int getTempPercentUsage(String queueName) throws Exception {
+ org.apache.activemq.broker.region.Destination destination =
brokerService.getDestination(
+ new ActiveMQQueue(queueName)
+ );
+ return destination.getTempUsage().getPercentUsage();
+ }
+
+ private class Producer implements Runnable {
+ private static final String MESSAGE_DATA = "012346789";
+
+ private final AtomicLong messageId = new AtomicLong();
+ private final AtomicReference<Exception> exception = new
AtomicReference<>();
+ private final String queueName;
+ private final long messageTtl;
+ private final int messageBodySize;
+ private final Connection con;
+ private volatile boolean isStopped = false;
+
+ public Producer(
+ ConnectionFactory connectionFactory, String queueName, long
messageTtl, int messageBodySize
+ ) throws JMSException {
+ this.queueName = queueName;
+ this.messageTtl = messageTtl;
+ this.messageBodySize = messageBodySize;
+ this.con = connectionFactory.createConnection();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!isStopped) {
+ Message message = sendMessage(
+ con, queueName, messageId.incrementAndGet(), messageTtl,
messageBodySize
+ );
+ logMessageSent(message);
+ }
+ } catch(Exception e) {
+ exception.set(e);
+ } finally {
+ try {
+ close();
+ } catch(Exception e) {
+ exception.set(e);
+ }
+ }
+ }
+
+ public void stop() {
+ isStopped = true;
+ }
+
+ public long getSentMessages() {
+ return messageId.get();
+ }
+
+ public Exception getException() {
+ return exception.get();
+ }
+
+ private Message sendMessage(
+ Connection con, String queueName, long messageId, long messageTTL,
int messageBodySize
+ ) throws Exception {
+ TextMessage message;
+ Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ try {
+ Destination destination = new ActiveMQQueue(queueName);
+ MessageProducer producer = session.createProducer(destination);
+ try {
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ if (messageTTL > 0 ) {
+ producer.setTimeToLive(messageTTL);
+ }
+ message =
session.createTextMessage(createMessageBody(messageBodySize));
+ message.setStringProperty(MESSAGE_ID_PROPERTY_NAME,
String.valueOf(messageId));
+ producer.send(message);
+ } finally {
+ producer.close();
+ }
+ } finally {
+ session.close();
+ }
+ return message;
+ }
+
+ private String createMessageBody(int size) {
+ StringBuilder sb = new StringBuilder(size);
+ int messageDataLength = MESSAGE_DATA.length();
+ for (int i = 0; i < size; i++) {
+ sb.append(MESSAGE_DATA.charAt(i % messageDataLength));
+ }
+ return sb.toString();
+ }
+
+ private void logMessageSent(Message message) throws Exception {
+ LOG.debug(String.format(
+ "MessageId %s sent (Count=%d), MemoryPercentageUsage=%d%%,
TempPercentUsage=%d%%, ExpirationCount=%d, QueueSize=%d",
+ message.getStringProperty(MESSAGE_ID_PROPERTY_NAME),
+ messageId.get(),
+ getMemoryPercentUsage(queueName),
+ getTempPercentUsage(queueName),
+ getExpirationCount(queueName),
+ getQueueSize(queueName)
+ ));
+ }
+
+ private void close() throws JMSException {
+ con.close();
+ }
+
+ }
+
+ private class Consumer implements Runnable {
+ private final List<Message> receivedMessages = new ArrayList<>();
+ private final AtomicReference<Exception> exception = new
AtomicReference<>();
+ private final String queueName;
+ private final Connection con;
+ private volatile boolean isStopped = false;
+
+ public Consumer(ConnectionFactory connectionFactory, String queueName)
throws JMSException {
+ this.queueName = queueName;
+ this.con = connectionFactory.createConnection();
+ this.con.start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!isStopped) {
+ Message message = receiveMessage(con, queueName);
+ if (message != null) {
+ addReceivedMessage(message);
+ }
+ logMessageReceived(message);
+ }
+ } catch(Exception e) {
+ exception.set(e);
+ } finally {
+ try {
+ close();
+ } catch(Exception e) {
+ exception.set(e);
+ }
+ }
+ }
+
+ public void stop() {
+ isStopped = true;
+ }
+
+ public List<Message> getReceivedMessages() {
+ synchronized(this) {
+ return new ArrayList<>(receivedMessages);
+ }
+ }
+
+ public Exception getException() {
+ return exception.get();
+ }
+
+ private Message receiveMessage(Connection con, String queueName)
throws Exception {
+ Session session = con.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ try {
+ Destination destination = new ActiveMQQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(destination);
+ try {
+ return consumer.receive(1000);
+ } finally {
+ consumer.close();
+ }
+ } finally {
+ session.close();
+ }
+ }
+
+ private void logMessageReceived(Message message) throws Exception {
+ int memoryPercentUsage = getMemoryPercentUsage(queueName);
+ int tempPercentUsage = getTempPercentUsage(queueName);
+ long expirationCount = getExpirationCount(queueName);
+ long queueSize = getQueueSize(queueName);
+ if(message != null) {
+ LOG.debug(String.format(
+ "MessageId %s received (Count=%d),
MemoryPercentageUsage=%d%%, TempPercentUsage=%d%%, ExpirationCount=%d,
QueueSize=%d",
+ message.getStringProperty(MESSAGE_ID_PROPERTY_NAME),
+ receivedMessages.size(),
+ memoryPercentUsage,
+ tempPercentUsage,
+ expirationCount,
+ queueSize
+ ));
+ } else {
+ LOG.debug(String.format(
+ "Message wasn't receive, MemoryPercentageUsage=%d%%,
TempPercentUsage=%d%%, ExpirationCount=%d, QueueSize=%d",
+ memoryPercentUsage,
+ tempPercentUsage,
+ expirationCount,
+ queueSize
+ ));
+ }
+ }
+
+ private void addReceivedMessage(Message message) {
+ synchronized(receivedMessages) {
+ receivedMessages.add(message);
+ }
+ }
+
+ private void close() throws JMSException {
+ con.close();
+ }
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact