This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new f2e5e02cd5 ARTEMIS-5779 add a DLQ retry method with selector support
f2e5e02cd5 is described below

commit f2e5e02cd53fa4bb7552c40f9bbbea3994f61820
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Nov 21 15:31:52 2025 -0500

    ARTEMIS-5779 add a DLQ retry method with selector support
    
    Leverage the existing functionality to add a new API that accepts a filter
    string to be used when retrying messages from the DLQ.
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |   8 ++
 .../artemis/api/core/management/QueueControl.java  |   9 ++
 .../core/management/impl/QueueControlImpl.java     |  18 ++++
 .../integration/management/QueueControlTest.java   | 104 +++++++++++++++++++++
 .../management/QueueControlUsingCoreTest.java      |   5 +
 5 files changed, 144 insertions(+)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 841455db7b..4ff50500c6 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2888,4 +2888,12 @@ public interface AuditLogger {
 
    @LogMessage(id = 601801, value = "User {} is deleting a address on target 
resource: {} {}", level = LogMessage.Level.INFO)
    void destroyAddress(String user, Object source, Object... args);
+
+   static void retryMessages(Object source, Object... args) {
+      BASE_LOGGER.retryMessages(getCaller(), source, parametersList(args));
+   }
+
+   @LogMessage(id = 601802, value = "User {} is retry sending messages on 
target resource: {} {}", level = LogMessage.Level.INFO)
+   void retryMessages(String user, Object source, String args);
+
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 3aa4850058..97d08fc003 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -456,6 +456,15 @@ public interface QueueControl {
    @Operation(desc = "Retry all messages on a DLQ to their respective original 
queues", impact = MBeanOperationInfo.ACTION)
    int retryMessages() throws Exception;
 
+   /**
+    * Retries all messages that match the given filter on a DLQ to their 
respective original queues.
+    * This operation is appropriate for Dead letter queues only.
+    *
+    * @return the number of retried messages.
+    */
+   @Operation(desc = "Retry all messages matching the given filter on a DLQ to 
their respective original queues", impact = MBeanOperationInfo.ACTION)
+   int retryMessages(@Parameter(name = "filter", desc = "A message filter (can 
be empty)") String filter) throws Exception;
+
    /**
     * Moves the message corresponding to the specified message ID to the 
specified other queue.
     *
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index ad8b45a81d..de7b6876d1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1324,6 +1324,24 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       }
    }
 
+   @Override
+   public int retryMessages(String filter) throws Exception {
+      // this is a critical task, we need to prevent parallel tasks running
+      try (AutoCloseable lock = server.managementLock()) {
+         if (AuditLogger.isBaseLoggingEnabled()) {
+            AuditLogger.retryMessages(queue, filter);
+         }
+         checkStarted();
+         clearIO();
+
+         try {
+            return queue.retryMessages(FilterImpl.createFilter(filter));
+         } finally {
+            blockOnIO();
+         }
+      }
+   }
+
    @Override
    public boolean moveMessage(final long messageID, final String 
otherQueueName) throws Exception {
       return moveMessage(messageID, otherQueueName, false);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 012b169bee..9f115e48a2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -108,6 +108,7 @@ import static 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataCo
 import static 
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -2236,6 +2237,109 @@ public class QueueControlTest extends 
ManagementTestBase {
       assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
    }
 
+   /**
+    * Test retry messages matching a filter from DLQ to original queue.
+    */
+   @TestTemplate
+   public void testRetryMatchingMessages() throws Exception {
+      final SimpleString dla = SimpleString.of("DLA");
+      final SimpleString qName = SimpleString.of("q1");
+      final SimpleString adName = SimpleString.of("ad1");
+      final SimpleString dlq = SimpleString.of("DLQ1");
+      final String sampleText = "Put me on DLQ";
+
+      AddressSettings addressSettings = new 
AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
+      server.getAddressSettingsRepository().addMatch(adName.toString(), 
addressSettings);
+
+      
session.createQueue(QueueConfiguration.of(dlq).setAddress(dla).setDurable(durable));
+      
session.createQueue(QueueConfiguration.of(qName).setAddress(adName).setDurable(durable));
+
+      // Send message to queue.
+      ClientProducer producer = session.createProducer(adName);
+
+      producer.send(createTextMessage(session, sampleText + 
":red").putStringProperty("color", "red"));
+      producer.send(createTextMessage(session, sampleText + 
":green").putStringProperty("color", "green"));
+      producer.send(createTextMessage(session, sampleText + 
":blue").putStringProperty("color", "blue"));
+
+      session.start();
+
+      final LocalQueueBinding binding = (LocalQueueBinding) 
server.getPostOffice().getBinding(qName);
+      Queue q = binding.getQueue();
+      final LocalQueueBinding binding2 = (LocalQueueBinding) 
server.getPostOffice().getBinding(dlq);
+      Queue q2 = binding2.getQueue();
+
+      //Verify that original queue has a memory size greater than 0 and DLQ is 0
+      assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
+      assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
+
+      // Read and rollback all messages to DLQ
+      ClientConsumer clientConsumer = session.createConsumer(qName);
+
+      ClientMessage clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      assertNotNull(clientMessage);
+      assertEquals(sampleText + ":red", 
clientMessage.getBodyBuffer().readString());
+      session.rollback();
+      clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      assertNotNull(clientMessage);
+      assertEquals(sampleText + ":green", 
clientMessage.getBodyBuffer().readString());
+      session.rollback();
+      clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      assertNotNull(clientMessage);
+      assertEquals(sampleText + ":blue", 
clientMessage.getBodyBuffer().readString());
+      session.rollback();
+
+      assertNull(clientConsumer.receiveImmediate());
+
+      //Verify that original queue has a memory size of 0 and DLQ is greater 
than 0 after rollback
+      assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
+      assertTrue(QueueImplTestAccessor.getQueueMemorySize(q2) > 0);
+
+      QueueControl dlqQueueControl = createManagementControl(dla, dlq);
+      assertMessageMetrics(dlqQueueControl, 3, durable);
+
+      // Retry matching messages - i.e. they should go from DLQ to original 
Queue.
+      assertEquals(1, dlqQueueControl.retryMessages("color = 'green'"));
+
+      // Assert DLQ is not empty...
+      assertMessageMetrics(dlqQueueControl, 2, durable);
+
+      //Verify that original queue has a memory size of greater than 0 and DLQ 
still has data since the filter only matched one
+      assertTrue(QueueImplTestAccessor.getQueueMemorySize(q) > 0);
+      assertNotEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
+
+      // .. and that the message is now on the original queue once more.
+      {
+         ClientMessage retriedMessage = clientConsumer.receive(500);
+         clientMessage.acknowledge();
+         assertNotNull(retriedMessage);
+         assertEquals(sampleText + ":green", 
retriedMessage.getBodyBuffer().readString());
+      }
+
+      // Retry moving messages without a filter - i.e. all remaining should go 
from DLQ to original Queue.
+      assertEquals(2, dlqQueueControl.retryMessages(null));
+
+      // .. and that the messages are now on the original queue once more.
+      {
+         ClientMessage retriedMessage = clientConsumer.receive(500);
+         clientMessage.acknowledge();
+         assertNotNull(retriedMessage);
+         assertEquals(sampleText + ":red", 
retriedMessage.getBodyBuffer().readString());
+         retriedMessage = clientConsumer.receive(500);
+         clientMessage.acknowledge();
+         assertNotNull(retriedMessage);
+         assertEquals(sampleText + ":blue", 
retriedMessage.getBodyBuffer().readString());
+      }
+
+      clientConsumer.close();
+
+      //Verify that original queue and DLQ have a memory size of 0
+      assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q));
+      assertEquals(0, QueueImplTestAccessor.getQueueMemorySize(q2));
+   }
+
    /**
     * Test send to DLA while paging includes paged messages
     */
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 5cea1efd54..d467bcf3b6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -525,6 +525,11 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest {
             return (Integer) proxy.invokeOperation(Integer.class, 
"retryMessages");
          }
 
+         @Override
+         public int retryMessages(String filter) throws Exception {
+            return (Integer) proxy.invokeOperation(Integer.class, 
"retryMessages", filter);
+         }
+
          @Override
          public int removeMessages(final String filter) throws Exception {
             return (Integer) proxy.invokeOperation(Integer.class, 
"removeMessages", filter);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to