This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 30c8fc7b10 ARTEMIS-5376 Include all messages in queue management 
operations
30c8fc7b10 is described below

commit 30c8fc7b10555478dfe6f912b38f312e3a39b165
Author: AntonRoskvist <[email protected]>
AuthorDate: Fri Feb 6 15:15:22 2026 +0100

    ARTEMIS-5376 Include all messages in queue management operations
---
 .../artemis/core/server/impl/QueueImpl.java        | 196 ++++++++++-----------
 .../management/ManagementWithPagingServerTest.java |  10 +-
 .../integration/management/QueueControlTest.java   | 161 +++++++++++++++++
 .../management/QueueControlUsingCoreTest.java      |   2 +-
 4 files changed, 264 insertions(+), 105 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 5ff8f65021..32cd4bfa5e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2047,6 +2048,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                          QueueIterateAction messageAction) throws Exception {
       int count = 0;
       int txCount = 0;
+
+      if (filter1 != null) {
+         messageAction.addFilter(filter1);
+      }
+
       // This is to avoid scheduling depaging while iterQueue is happening
       // this should minimize the use of the paged executor.
       depagePending = true;
@@ -2065,7 +2071,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                while (iter.hasNext() && 
!messageAction.expectedHitsReached(count)) {
                   MessageReference ref = iter.next();
 
-                  if (filter1 == null || filter1.match(ref.getMessage())) {
+                  if (messageAction.match(ref)) {
                      if (messageAction.actMessage(tx, ref)) {
                         iter.remove();
                         refRemoved(ref);
@@ -2087,7 +2093,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   return count;
                }
 
-               List<MessageReference> cancelled = 
scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : 
filter1.match(ref.getMessage()));
+               List<MessageReference> cancelled = 
scheduledDeliveryHandler.cancel(messageAction::match);
                for (MessageReference messageReference : cancelled) {
                   messageAction.actMessage(tx, messageReference);
                   count++;
@@ -2110,12 +2116,12 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                PagedReference reference = pageIterator.next();
                pageIterator.remove();
 
-               if (filter1 == null || filter1.match(reference.getMessage())) {
-                  count++;
-                  txCount++;
+               if (messageAction.match(reference)) {
                   if (!messageAction.actMessage(tx, reference)) {
                      addTail(reference, false);
                   }
+                  txCount++;
+                  count++;
                } else {
                   addTail(reference, false);
                }
@@ -2433,71 +2439,54 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized boolean sendMessageToDeadLetterAddress(final long 
messageID) throws Exception {
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (ref.getMessage().getMessageID() == messageID) {
-               incDelivering(ref);
-               sendToDeadLetterAddress(null, ref);
-               iter.remove();
-               refRemoved(ref);
-               return true;
-            }
-         }
-         if (pageIterator != null && !queueDestroyed) {
-            while (pageIterator.hasNext()) {
-               PagedReference ref = pageIterator.next();
-               if (ref.getMessage().getMessageID() == messageID) {
-                  incDelivering(ref);
-                  sendToDeadLetterAddress(null, ref);
-                  pageIterator.remove();
-                  refRemoved(ref);
-                  return true;
-               }
-            }
+   public boolean sendMessageToDeadLetterAddress(final long messageID) throws 
Exception {
+
+      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+
+         @Override
+         public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
+            incDelivering(ref);
+            sendToDeadLetterAddress(tx, ref);
+            return true;
          }
-         return false;
-      }
+
+      }) == 1;
+
    }
 
    @Override
-   public synchronized int sendMessagesToDeadLetterAddress(Filter filter) 
throws Exception {
+   public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
 
       return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
-
             incDelivering(ref);
-            return sendToDeadLetterAddress(tx, ref);
+            sendToDeadLetterAddress(tx, ref);
+            return true;
          }
+
       });
+
    }
 
    @Override
-   public synchronized boolean moveReference(final long messageID,
-                                             final SimpleString toAddress,
-                                             final Binding binding,
-                                             final boolean rejectDuplicate) 
throws Exception {
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (ref.getMessage().getMessageID() == messageID) {
-               iter.remove();
-               refRemoved(ref);
-               incDelivering(ref);
-               try {
-                  move(null, toAddress, binding, ref, rejectDuplicate, 
AckReason.NORMAL, null, null, true);
-               } catch (Exception e) {
-                  decDelivering(ref);
-                  throw e;
-               }
-               return true;
-            }
+   public boolean moveReference(final long messageID,
+                                final SimpleString toAddress,
+                                final Binding binding,
+                                final boolean rejectDuplicate) throws 
Exception {
+
+      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+
+         @Override
+         public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
+            incDelivering(ref);
+            move(tx, toAddress, binding, ref, rejectDuplicate, 
AckReason.NORMAL, null, null, true);
+            return true;
          }
-         return false;
-      }
+
+      }) == 1;
+
    }
 
    @Override
@@ -2543,7 +2532,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             }
 
             if (!ignored) {
-               move(null, toAddress, binding, ref, rejectDuplicates, 
AckReason.NORMAL, null, null, true);
+               move(tx, toAddress, binding, ref, rejectDuplicates, 
AckReason.NORMAL, null, null, true);
             }
 
             return true;
@@ -2561,26 +2550,23 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized boolean copyReference(final long messageID,
-                                             final SimpleString toQueue,
-                                             final Binding binding) throws 
Exception {
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (ref.getMessage().getMessageID() == messageID) {
-               try {
-                  copy(null, toQueue, binding, ref);
-               } catch (Exception e) {
-                  throw e;
-               }
-               return true;
-            }
+   public boolean copyReference(final long messageID,
+                                final SimpleString toQueue,
+                                final Binding binding) throws Exception {
+
+      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+
+         @Override
+         public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
+            copy(tx, toQueue, binding, ref);
+            return false;
          }
-         return false;
-      }
+
+      }) == 1;
+
    }
 
-   public synchronized int rerouteMessages(final SimpleString queueName, final 
Filter filter) throws Exception {
+   public int rerouteMessages(final SimpleString queueName, final Filter 
filter) throws Exception {
       return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2649,40 +2635,33 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized boolean changeReferencePriority(final long messageID, 
final byte newPriority) throws Exception {
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
+   public boolean changeReferencePriority(final long messageID, final byte 
newPriority) throws Exception {
 
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (ref.getMessage().getMessageID() == messageID) {
-               iter.remove();
-               refRemoved(ref);
-               ref.getMessage().setPriority(newPriority);
-               addTail(ref, false);
-               return true;
-            }
+      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+
+         @Override
+         public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
+            ref.getMessage().setPriority(newPriority);
+            return false;
          }
 
-         return false;
-      }
+      }) == 1;
+
    }
 
    @Override
-   public synchronized int changeReferencesPriority(final Filter filter, final 
byte newPriority) throws Exception {
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
-         int count = 0;
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (filter == null || filter.match(ref.getMessage())) {
-               count++;
-               iter.remove();
-               refRemoved(ref);
-               ref.getMessage().setPriority(newPriority);
-               addTail(ref, false);
-            }
+   public int changeReferencesPriority(final Filter filter, final byte 
newPriority) throws Exception {
+
+      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+
+         @Override
+         public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
+            ref.getMessage().setPriority(newPriority);
+            return false;
          }
-         return count;
-      }
+
+      });
+
    }
 
    @Override
@@ -4222,13 +4201,23 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    abstract class QueueIterateAction {
 
       protected Integer expectedHits;
+      protected Long messageID;
+      protected Filter filter1 = null;
+      protected Predicate<MessageReference> match;
 
       QueueIterateAction(Integer expectedHits) {
          this.expectedHits = expectedHits;
+         this.match = ref -> filter1 == null ? true : 
filter1.match(ref.getMessage());
+      }
+
+      QueueIterateAction(Long messageID) {
+         this.expectedHits = 1;
+         this.match = ref -> ref.getMessage().getMessageID() == messageID;
       }
 
       QueueIterateAction() {
          this.expectedHits = null;
+         this.match = ref -> filter1 == null ? true : 
filter1.match(ref.getMessage());
       }
 
       /**
@@ -4243,6 +4232,15 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       public boolean expectedHitsReached(int currentHits) {
          return expectedHits != null && currentHits >= expectedHits.intValue();
       }
+
+      public void addFilter(Filter filter1) {
+         this.filter1 = filter1;
+      }
+
+      public boolean match(MessageReference ref) {
+         return match.test(ref);
+      }
+
    }
 
    // For external use we need to use a synchronized version since the list is 
not thread safe
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
index cb564b9e47..09ddd9006b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.tests.integration.management;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -230,7 +229,7 @@ public class ManagementWithPagingServerTest extends 
ManagementTestBase {
 
       long messageID = (Long) messages[99].get("messageID");
 
-      assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));
+      assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));
 
       messageID = (Long) messages[0].get("messageID");
 
@@ -238,7 +237,8 @@ public class ManagementWithPagingServerTest extends 
ManagementTestBase {
 
       Map<String, Object>[] copiedMessages = 
otherQueueControl.listMessages(null);
 
-      assertEquals(1, copiedMessages.length);
+      //this validates copying of a paged message
+      assertEquals(2, copiedMessages.length);
    }
 
    @Test
@@ -281,8 +281,8 @@ public class ManagementWithPagingServerTest extends 
ManagementTestBase {
 
       messageID = (Long) otherMessages[100].get("messageID");
 
-      //this should fail as the message was paged successfully
-      assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
+      //this validates copying of a paged message
+      assertTrue(otherQueueControl.copyMessage(messageID, queue.toString()));
    }
 
    @Test
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 00a0205a2f..38f9ea10e8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -5051,7 +5051,157 @@ public class QueueControlTest extends 
ManagementTestBase {
       clientConsumer.close();
    }
 
+   @TestTemplate
+   public void testChangeMessagesPriorityIncludesPagedMessage() throws 
Exception {
+      final SimpleString queueName = SimpleString.of("queue");
+      final String sampleText = "Message Content";
+      final int messageCount = 10;
+
+      AddressSettings addressSettings = new 
AddressSettings().setMaxSizeBytes(200L);
+      server.getAddressSettingsRepository().addMatch(queueName.toString(), 
addressSettings);
+      
session.createQueue(QueueConfiguration.of(queueName).setDurable(durable));
+
+      // Send message to queue, make sure it enters paging.
+      ClientProducer producer = session.createProducer(queueName);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(createTextMessage(session, 
sampleText).setPriority((byte) 2));
+      }
+
+      
Wait.assertTrue(server.locateQueue(queueName).getPagingStore()::isPaging);
+
+      //Send identifiable message to paging queue
+      producer.send(createTextMessage(session, sampleText).setPriority((byte) 
2));
+
+      QueueControl queueControl = createManagementControl(queueName, 
queueName);
+
+      for (Map<String, Object> messageMap : queueControl.listMessages(null)) {
+         assertEquals(2, getPriorityStraight(messageMap.get("priority")));
+      }
+
+      queueControl.changeMessagesPriority(null, 5);
+
+      //Make sure priority is changed on all messages
+      for (Map<String, Object> messageMap : queueControl.listMessages(null)) {
+         assertEquals(5, getPriorityStraight(messageMap.get("priority")));
+      }
+
+      queueControl.removeAllMessages();
+
+   }
+
+   @TestTemplate
+   public void testChangeMessagePriorityIncludesPagedMessage() throws 
Exception {
+      final SimpleString queueName = SimpleString.of("queue");
+      final String sampleText = "Message Content";
+      final int messageCount = 10;
+
+      AddressSettings addressSettings = new 
AddressSettings().setMaxSizeBytes(200L);
+      server.getAddressSettingsRepository().addMatch(queueName.toString(), 
addressSettings);
+      
session.createQueue(QueueConfiguration.of(queueName).setDurable(durable));
+
+      // Send message to queue, make sure it enters paging.
+      ClientProducer producer = session.createProducer(queueName);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(createTextMessage(session, sampleText));
+      }
+
+      
Wait.assertTrue(server.locateQueue(queueName).getPagingStore()::isPaging);
+
+      //Send identifiable message to paging queue
+      producer.send(createTextMessage(session, 
sampleText).putStringProperty("myID", "unique").setPriority((byte) 2));
+
+      QueueControl queueControl = createManagementControl(queueName, 
queueName);
+      Map<String, Object>[] messages = queueControl.listMessages(null);
+      long messageID = (Long) messages[messageCount].get("messageID");
+
+      assertEquals(2, 
getPriorityStraight(messages[messageCount].get("priority")));
+
+      queueControl.changeMessagePriority(messageID, 5);
+
+      //Make sure priority is changed on the message
+      for (Map<String, Object> messageMap : queueControl.listMessages(null)) {
+         if (messageMap.get("messageID").equals(messageID)) {
+            assertEquals(5, getPriorityStraight(messageMap.get("priority")));
+         }
+      }
+
+      queueControl.removeAllMessages();
+
+   }
 
+   @TestTemplate
+   public void testMoveMessageIncludesPagedMessage() throws Exception {
+      final String queueNameMatch = "queue.#";
+      final SimpleString queueName1 = SimpleString.of("queue.1");
+      final SimpleString queueName2 = SimpleString.of("queue.2");
+      final String sampleText = "Message Content";
+      final int messageCount = 10;
+
+      AddressSettings addressSettings = new 
AddressSettings().setMaxSizeBytes(200L);
+      server.getAddressSettingsRepository().addMatch(queueNameMatch, 
addressSettings);
+      
session.createQueue(QueueConfiguration.of(queueName1).setDurable(durable));
+      
session.createQueue(QueueConfiguration.of(queueName2).setDurable(durable));
+
+      // Send message to queue, make sure it enters paging.
+      ClientProducer producer = session.createProducer(queueName1);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(createTextMessage(session, sampleText));
+      }
+
+      
Wait.assertTrue(server.locateQueue(queueName1).getPagingStore()::isPaging);
+
+      //Send identifiable message to paging queue
+      producer.send(createTextMessage(session, 
sampleText).putStringProperty("myID", "unique").setPriority((byte) 2));
+
+      QueueControl queueControl = createManagementControl(queueName1, 
queueName1);
+      Map<String, Object>[] messages = queueControl.listMessages(null);
+      long messageID = (Long) messages[messageCount].get("messageID");
+
+      assertEquals(messageCount + 1, 
server.locateQueue(queueName1).getMessageCount());
+
+      queueControl.moveMessage(messageID, queueName2.toString());
+
+      assertEquals(messageCount, 
server.locateQueue(queueName1).getMessageCount());
+      assertEquals(1, server.locateQueue(queueName2).getMessageCount());
+
+   }
+
+   @TestTemplate
+   public void testCopyMessageIncludesPagedMessage() throws Exception {
+      final String queueNameMatch = "queue.#";
+      final SimpleString queueName1 = SimpleString.of("queue.1");
+      final SimpleString queueName2 = SimpleString.of("queue.2");
+      final String sampleText = "Message Content";
+      final int messageCount = 10;
+
+      AddressSettings addressSettings = new 
AddressSettings().setMaxSizeBytes(200L);
+      server.getAddressSettingsRepository().addMatch(queueNameMatch, 
addressSettings);
+      
session.createQueue(QueueConfiguration.of(queueName1).setDurable(durable));
+      
session.createQueue(QueueConfiguration.of(queueName2).setDurable(durable));
+
+      // Send message to queue, make sure it enters paging.
+      ClientProducer producer = session.createProducer(queueName1);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(createTextMessage(session, sampleText));
+      }
+
+      
Wait.assertTrue(server.locateQueue(queueName1).getPagingStore()::isPaging);
+
+      //Send identifiable message to paging queue
+      producer.send(createTextMessage(session, 
sampleText).putStringProperty("myID", "unique").setPriority((byte) 2));
+
+      QueueControl queueControl = createManagementControl(queueName1, 
queueName1);
+      Map<String, Object>[] messages = queueControl.listMessages(null);
+      long messageID = (Long) messages[messageCount].get("messageID");
+
+      assertEquals(messageCount + 1, 
server.locateQueue(queueName1).getMessageCount());
+
+      queueControl.copyMessage(messageID, queueName2.toString());
+
+      assertEquals(messageCount + 1, 
server.locateQueue(queueName1).getMessageCount());
+      assertEquals(1, server.locateQueue(queueName2).getMessageCount());
+
+   }
 
    @Override
    @BeforeEach
@@ -5068,6 +5218,17 @@ public class QueueControlTest extends ManagementTestBase 
{
       session.start();
    }
 
+   protected int getPriorityStraight(Object priority) {
+      if (priority instanceof Byte) {
+         return ((Byte) priority).intValue();
+
+      } else if (priority instanceof Long) {
+         return ((Long) priority).intValue();
+      }
+
+      return (Integer) priority;
+   }
+
    protected long getFirstMessageId(final QueueControl queueControl) throws 
Exception {
       JsonArray array = 
JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
       JsonObject object = (JsonObject) array.get(0);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index d467bcf3b6..04aef0bfef 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -150,7 +150,7 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest {
 
          @Override
          public int changeMessagesPriority(final String filter, final int 
newPriority) throws Exception {
-            return (Integer) proxy.invokeOperation("changeMessagesPriority", 
filter, newPriority);
+            return (Integer) proxy.invokeOperation(Integer.class, 
"changeMessagesPriority", filter, newPriority);
          }
 
          @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to