This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 30c8fc7b10 ARTEMIS-5376 Include all messages in queue management
operations
30c8fc7b10 is described below
commit 30c8fc7b10555478dfe6f912b38f312e3a39b165
Author: AntonRoskvist <[email protected]>
AuthorDate: Fri Feb 6 15:15:22 2026 +0100
ARTEMIS-5376 Include all messages in queue management operations
---
.../artemis/core/server/impl/QueueImpl.java | 196 ++++++++++-----------
.../management/ManagementWithPagingServerTest.java | 10 +-
.../integration/management/QueueControlTest.java | 161 +++++++++++++++++
.../management/QueueControlUsingCoreTest.java | 2 +-
4 files changed, 264 insertions(+), 105 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 5ff8f65021..32cd4bfa5e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2047,6 +2048,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
QueueIterateAction messageAction) throws Exception {
int count = 0;
int txCount = 0;
+
+ if (filter1 != null) {
+ messageAction.addFilter(filter1);
+ }
+
// This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor.
depagePending = true;
@@ -2065,7 +2071,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
while (iter.hasNext() &&
!messageAction.expectedHitsReached(count)) {
MessageReference ref = iter.next();
- if (filter1 == null || filter1.match(ref.getMessage())) {
+ if (messageAction.match(ref)) {
if (messageAction.actMessage(tx, ref)) {
iter.remove();
refRemoved(ref);
@@ -2087,7 +2093,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return count;
}
- List<MessageReference> cancelled =
scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true :
filter1.match(ref.getMessage()));
+ List<MessageReference> cancelled =
scheduledDeliveryHandler.cancel(messageAction::match);
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference);
count++;
@@ -2110,12 +2116,12 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
PagedReference reference = pageIterator.next();
pageIterator.remove();
- if (filter1 == null || filter1.match(reference.getMessage())) {
- count++;
- txCount++;
+ if (messageAction.match(reference)) {
if (!messageAction.actMessage(tx, reference)) {
addTail(reference, false);
}
+ txCount++;
+ count++;
} else {
addTail(reference, false);
}
@@ -2433,71 +2439,54 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
- public synchronized boolean sendMessageToDeadLetterAddress(final long
messageID) throws Exception {
- try (LinkedListIterator<MessageReference> iter = iterator()) {
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID) {
- incDelivering(ref);
- sendToDeadLetterAddress(null, ref);
- iter.remove();
- refRemoved(ref);
- return true;
- }
- }
- if (pageIterator != null && !queueDestroyed) {
- while (pageIterator.hasNext()) {
- PagedReference ref = pageIterator.next();
- if (ref.getMessage().getMessageID() == messageID) {
- incDelivering(ref);
- sendToDeadLetterAddress(null, ref);
- pageIterator.remove();
- refRemoved(ref);
- return true;
- }
- }
+ public boolean sendMessageToDeadLetterAddress(final long messageID) throws
Exception {
+
+ return iterQueue(DEFAULT_FLUSH_LIMIT, null, new
QueueIterateAction(messageID) {
+
+ @Override
+ public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
+ incDelivering(ref);
+ sendToDeadLetterAddress(tx, ref);
+ return true;
}
- return false;
- }
+
+ }) == 1;
+
}
@Override
- public synchronized int sendMessagesToDeadLetterAddress(Filter filter)
throws Exception {
+ public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
-
incDelivering(ref);
- return sendToDeadLetterAddress(tx, ref);
+ sendToDeadLetterAddress(tx, ref);
+ return true;
}
+
});
+
}
@Override
- public synchronized boolean moveReference(final long messageID,
- final SimpleString toAddress,
- final Binding binding,
- final boolean rejectDuplicate)
throws Exception {
- try (LinkedListIterator<MessageReference> iter = iterator()) {
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID) {
- iter.remove();
- refRemoved(ref);
- incDelivering(ref);
- try {
- move(null, toAddress, binding, ref, rejectDuplicate,
AckReason.NORMAL, null, null, true);
- } catch (Exception e) {
- decDelivering(ref);
- throw e;
- }
- return true;
- }
+ public boolean moveReference(final long messageID,
+ final SimpleString toAddress,
+ final Binding binding,
+ final boolean rejectDuplicate) throws
Exception {
+
+ return iterQueue(DEFAULT_FLUSH_LIMIT, null, new
QueueIterateAction(messageID) {
+
+ @Override
+ public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
+ incDelivering(ref);
+ move(tx, toAddress, binding, ref, rejectDuplicate,
AckReason.NORMAL, null, null, true);
+ return true;
}
- return false;
- }
+
+ }) == 1;
+
}
@Override
@@ -2543,7 +2532,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
if (!ignored) {
- move(null, toAddress, binding, ref, rejectDuplicates,
AckReason.NORMAL, null, null, true);
+ move(tx, toAddress, binding, ref, rejectDuplicates,
AckReason.NORMAL, null, null, true);
}
return true;
@@ -2561,26 +2550,23 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
- public synchronized boolean copyReference(final long messageID,
- final SimpleString toQueue,
- final Binding binding) throws
Exception {
- try (LinkedListIterator<MessageReference> iter = iterator()) {
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID) {
- try {
- copy(null, toQueue, binding, ref);
- } catch (Exception e) {
- throw e;
- }
- return true;
- }
+ public boolean copyReference(final long messageID,
+ final SimpleString toQueue,
+ final Binding binding) throws Exception {
+
+ return iterQueue(DEFAULT_FLUSH_LIMIT, null, new
QueueIterateAction(messageID) {
+
+ @Override
+ public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
+ copy(tx, toQueue, binding, ref);
+ return false;
}
- return false;
- }
+
+ }) == 1;
+
}
- public synchronized int rerouteMessages(final SimpleString queueName, final
Filter filter) throws Exception {
+ public int rerouteMessages(final SimpleString queueName, final Filter
filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
@@ -2649,40 +2635,33 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
- public synchronized boolean changeReferencePriority(final long messageID,
final byte newPriority) throws Exception {
- try (LinkedListIterator<MessageReference> iter = iterator()) {
+ public boolean changeReferencePriority(final long messageID, final byte
newPriority) throws Exception {
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
- if (ref.getMessage().getMessageID() == messageID) {
- iter.remove();
- refRemoved(ref);
- ref.getMessage().setPriority(newPriority);
- addTail(ref, false);
- return true;
- }
+ return iterQueue(DEFAULT_FLUSH_LIMIT, null, new
QueueIterateAction(messageID) {
+
+ @Override
+ public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
+ ref.getMessage().setPriority(newPriority);
+ return false;
}
- return false;
- }
+ }) == 1;
+
}
@Override
- public synchronized int changeReferencesPriority(final Filter filter, final
byte newPriority) throws Exception {
- try (LinkedListIterator<MessageReference> iter = iterator()) {
- int count = 0;
- while (iter.hasNext()) {
- MessageReference ref = iter.next();
- if (filter == null || filter.match(ref.getMessage())) {
- count++;
- iter.remove();
- refRemoved(ref);
- ref.getMessage().setPriority(newPriority);
- addTail(ref, false);
- }
+ public int changeReferencesPriority(final Filter filter, final byte
newPriority) throws Exception {
+
+ return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+
+ @Override
+ public boolean actMessage(Transaction tx, MessageReference ref)
throws Exception {
+ ref.getMessage().setPriority(newPriority);
+ return false;
}
- return count;
- }
+
+ });
+
}
@Override
@@ -4222,13 +4201,23 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
abstract class QueueIterateAction {
protected Integer expectedHits;
+ protected Long messageID;
+ protected Filter filter1 = null;
+ protected Predicate<MessageReference> match;
QueueIterateAction(Integer expectedHits) {
this.expectedHits = expectedHits;
+ this.match = ref -> filter1 == null ? true :
filter1.match(ref.getMessage());
+ }
+
+ QueueIterateAction(Long messageID) {
+ this.expectedHits = 1;
+ this.match = ref -> ref.getMessage().getMessageID() == messageID;
}
QueueIterateAction() {
this.expectedHits = null;
+ this.match = ref -> filter1 == null ? true :
filter1.match(ref.getMessage());
}
/**
@@ -4243,6 +4232,15 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
public boolean expectedHitsReached(int currentHits) {
return expectedHits != null && currentHits >= expectedHits.intValue();
}
+
+ public void addFilter(Filter filter1) {
+ this.filter1 = filter1;
+ }
+
+ public boolean match(MessageReference ref) {
+ return match.test(ref);
+ }
+
}
// For external use we need to use a synchronized version since the list is
not thread safe
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
index cb564b9e47..09ddd9006b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.tests.integration.management;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -230,7 +229,7 @@ public class ManagementWithPagingServerTest extends
ManagementTestBase {
long messageID = (Long) messages[99].get("messageID");
- assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));
+ assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
messageID = (Long) messages[0].get("messageID");
@@ -238,7 +237,8 @@ public class ManagementWithPagingServerTest extends
ManagementTestBase {
Map<String, Object>[] copiedMessages =
otherQueueControl.listMessages(null);
- assertEquals(1, copiedMessages.length);
+ //this validates copying of a paged message
+ assertEquals(2, copiedMessages.length);
}
@Test
@@ -281,8 +281,8 @@ public class ManagementWithPagingServerTest extends
ManagementTestBase {
messageID = (Long) otherMessages[100].get("messageID");
- //this should fail as the message was paged successfully
- assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
+ //this validates copying of a paged message
+ assertTrue(otherQueueControl.copyMessage(messageID, queue.toString()));
}
@Test
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 00a0205a2f..38f9ea10e8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -5051,7 +5051,157 @@ public class QueueControlTest extends
ManagementTestBase {
clientConsumer.close();
}
+ @TestTemplate
+ public void testChangeMessagesPriorityIncludesPagedMessage() throws
Exception {
+ final SimpleString queueName = SimpleString.of("queue");
+ final String sampleText = "Message Content";
+ final int messageCount = 10;
+
+ AddressSettings addressSettings = new
AddressSettings().setMaxSizeBytes(200L);
+ server.getAddressSettingsRepository().addMatch(queueName.toString(),
addressSettings);
+
session.createQueue(QueueConfiguration.of(queueName).setDurable(durable));
+
+ // Send message to queue, make sure it enters paging.
+ ClientProducer producer = session.createProducer(queueName);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(createTextMessage(session,
sampleText).setPriority((byte) 2));
+ }
+
+
Wait.assertTrue(server.locateQueue(queueName).getPagingStore()::isPaging);
+
+ //Send identifiable message to paging queue
+ producer.send(createTextMessage(session, sampleText).setPriority((byte)
2));
+
+ QueueControl queueControl = createManagementControl(queueName,
queueName);
+
+ for (Map<String, Object> messageMap : queueControl.listMessages(null)) {
+ assertEquals(2, getPriorityStraight(messageMap.get("priority")));
+ }
+
+ queueControl.changeMessagesPriority(null, 5);
+
+ //Make sure priority is changed on all messages
+ for (Map<String, Object> messageMap : queueControl.listMessages(null)) {
+ assertEquals(5, getPriorityStraight(messageMap.get("priority")));
+ }
+
+ queueControl.removeAllMessages();
+
+ }
+
+ @TestTemplate
+ public void testChangeMessagePriorityIncludesPagedMessage() throws
Exception {
+ final SimpleString queueName = SimpleString.of("queue");
+ final String sampleText = "Message Content";
+ final int messageCount = 10;
+
+ AddressSettings addressSettings = new
AddressSettings().setMaxSizeBytes(200L);
+ server.getAddressSettingsRepository().addMatch(queueName.toString(),
addressSettings);
+
session.createQueue(QueueConfiguration.of(queueName).setDurable(durable));
+
+ // Send message to queue, make sure it enters paging.
+ ClientProducer producer = session.createProducer(queueName);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(createTextMessage(session, sampleText));
+ }
+
+
Wait.assertTrue(server.locateQueue(queueName).getPagingStore()::isPaging);
+
+ //Send identifiable message to paging queue
+ producer.send(createTextMessage(session,
sampleText).putStringProperty("myID", "unique").setPriority((byte) 2));
+
+ QueueControl queueControl = createManagementControl(queueName,
queueName);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
+ long messageID = (Long) messages[messageCount].get("messageID");
+
+ assertEquals(2,
getPriorityStraight(messages[messageCount].get("priority")));
+
+ queueControl.changeMessagePriority(messageID, 5);
+
+ //Make sure priority is changed on the message
+ for (Map<String, Object> messageMap : queueControl.listMessages(null)) {
+ if (messageMap.get("messageID").equals(messageID)) {
+ assertEquals(5, getPriorityStraight(messageMap.get("priority")));
+ }
+ }
+
+ queueControl.removeAllMessages();
+
+ }
+ @TestTemplate
+ public void testMoveMessageIncludesPagedMessage() throws Exception {
+ final String queueNameMatch = "queue.#";
+ final SimpleString queueName1 = SimpleString.of("queue.1");
+ final SimpleString queueName2 = SimpleString.of("queue.2");
+ final String sampleText = "Message Content";
+ final int messageCount = 10;
+
+ AddressSettings addressSettings = new
AddressSettings().setMaxSizeBytes(200L);
+ server.getAddressSettingsRepository().addMatch(queueNameMatch,
addressSettings);
+
session.createQueue(QueueConfiguration.of(queueName1).setDurable(durable));
+
session.createQueue(QueueConfiguration.of(queueName2).setDurable(durable));
+
+ // Send message to queue, make sure it enters paging.
+ ClientProducer producer = session.createProducer(queueName1);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(createTextMessage(session, sampleText));
+ }
+
+
Wait.assertTrue(server.locateQueue(queueName1).getPagingStore()::isPaging);
+
+ //Send identifiable message to paging queue
+ producer.send(createTextMessage(session,
sampleText).putStringProperty("myID", "unique").setPriority((byte) 2));
+
+ QueueControl queueControl = createManagementControl(queueName1,
queueName1);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
+ long messageID = (Long) messages[messageCount].get("messageID");
+
+ assertEquals(messageCount + 1,
server.locateQueue(queueName1).getMessageCount());
+
+ queueControl.moveMessage(messageID, queueName2.toString());
+
+ assertEquals(messageCount,
server.locateQueue(queueName1).getMessageCount());
+ assertEquals(1, server.locateQueue(queueName2).getMessageCount());
+
+ }
+
+ @TestTemplate
+ public void testCopyMessageIncludesPagedMessage() throws Exception {
+ final String queueNameMatch = "queue.#";
+ final SimpleString queueName1 = SimpleString.of("queue.1");
+ final SimpleString queueName2 = SimpleString.of("queue.2");
+ final String sampleText = "Message Content";
+ final int messageCount = 10;
+
+ AddressSettings addressSettings = new
AddressSettings().setMaxSizeBytes(200L);
+ server.getAddressSettingsRepository().addMatch(queueNameMatch,
addressSettings);
+
session.createQueue(QueueConfiguration.of(queueName1).setDurable(durable));
+
session.createQueue(QueueConfiguration.of(queueName2).setDurable(durable));
+
+ // Send message to queue, make sure it enters paging.
+ ClientProducer producer = session.createProducer(queueName1);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(createTextMessage(session, sampleText));
+ }
+
+
Wait.assertTrue(server.locateQueue(queueName1).getPagingStore()::isPaging);
+
+ //Send identifiable message to paging queue
+ producer.send(createTextMessage(session,
sampleText).putStringProperty("myID", "unique").setPriority((byte) 2));
+
+ QueueControl queueControl = createManagementControl(queueName1,
queueName1);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
+ long messageID = (Long) messages[messageCount].get("messageID");
+
+ assertEquals(messageCount + 1,
server.locateQueue(queueName1).getMessageCount());
+
+ queueControl.copyMessage(messageID, queueName2.toString());
+
+ assertEquals(messageCount + 1,
server.locateQueue(queueName1).getMessageCount());
+ assertEquals(1, server.locateQueue(queueName2).getMessageCount());
+
+ }
@Override
@BeforeEach
@@ -5068,6 +5218,17 @@ public class QueueControlTest extends ManagementTestBase
{
session.start();
}
+ protected int getPriorityStraight(Object priority) {
+ if (priority instanceof Byte) {
+ return ((Byte) priority).intValue();
+
+ } else if (priority instanceof Long) {
+ return ((Long) priority).intValue();
+ }
+
+ return (Integer) priority;
+ }
+
protected long getFirstMessageId(final QueueControl queueControl) throws
Exception {
JsonArray array =
JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
JsonObject object = (JsonObject) array.get(0);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index d467bcf3b6..04aef0bfef 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -150,7 +150,7 @@ public class QueueControlUsingCoreTest extends
QueueControlTest {
@Override
public int changeMessagesPriority(final String filter, final int
newPriority) throws Exception {
- return (Integer) proxy.invokeOperation("changeMessagesPriority",
filter, newPriority);
+ return (Integer) proxy.invokeOperation(Integer.class,
"changeMessagesPriority", filter, newPriority);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]