This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 28f7eb7ee [AMQ-9217] Fix per-destination audits on 
IndividualDeadLetterStrategy
     new dd6118e68 Merge pull request #965 from mattrpav/AMQ-9217
28f7eb7ee is described below

commit 28f7eb7ee87c47e43cc3db11fcd550ef872327b3
Author: Matt Pavlovich <m...@hyte.io>
AuthorDate: Thu Feb 2 16:30:28 2023 -0600

    [AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy
---
 .../region/policy/AbstractDeadLetterStrategy.java  |  22 ++--
 .../policy/IndividualDeadLetterStrategy.java       |  48 ++++++++
 .../region/policy/SharedDeadLetterStrategy.java    |  27 +++++
 .../broker/policy/IndividualDeadLetterTest.java    | 127 +++++++++++++++++++++
 4 files changed, 209 insertions(+), 15 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
index fa6532b33..82e07560e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
@@ -31,13 +31,12 @@ public abstract class AbstractDeadLetterStrategy implements 
DeadLetterStrategy {
     private boolean processNonPersistent = false;
     private boolean processExpired = true;
     private boolean enableAudit = true;
-    private final ActiveMQMessageAudit messageAudit = new 
ActiveMQMessageAudit();
     private long expiration;
 
     @Override
     public void rollback(Message message) {
         if (message != null && this.enableAudit) {
-            messageAudit.rollback(message);
+            lookupActiveMQMessageAudit(message).rollback(message);
         }
     }
 
@@ -46,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements 
DeadLetterStrategy {
         boolean result = false;
         if (message != null) {
             result = true;
-            if (enableAudit && messageAudit.isDuplicate(message)) {
+            if (enableAudit && 
lookupActiveMQMessageAudit(message).isDuplicate(message)) {
                 result = false;
                 LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", 
message.getMessageId(), message.getDestination());
             }
@@ -108,20 +107,13 @@ public abstract class AbstractDeadLetterStrategy 
implements DeadLetterStrategy {
         this.expiration = expiration;
     }
 
-    public int getMaxProducersToAudit() {
-        return messageAudit.getMaximumNumberOfProducersToTrack();
-    }
+    public abstract int getMaxProducersToAudit();
 
-    public void setMaxProducersToAudit(int maxProducersToAudit) {
-        messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
-    }
+    public abstract void setMaxProducersToAudit(int maxProducersToAudit);
 
-    public void setMaxAuditDepth(int maxAuditDepth) {
-        messageAudit.setAuditDepth(maxAuditDepth);
-    }
+    public abstract void setMaxAuditDepth(int maxAuditDepth);
 
-    public int getMaxAuditDepth() {
-        return messageAudit.getAuditDepth();
-    }
+    public abstract int getMaxAuditDepth();
 
+    protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message 
message);
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
index 1dfaa1566..3dd41ae0b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
@@ -23,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.util.LRUCache;
 
 /**
  * A {@link DeadLetterStrategy} where each destination has its own individual
@@ -40,6 +42,10 @@ public class IndividualDeadLetterStrategy extends 
AbstractDeadLetterStrategy {
     private boolean useQueueForQueueMessages = true;
     private boolean useQueueForTopicMessages = true;
     private boolean destinationPerDurableSubscriber;
+    private int maxAuditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+    private int maxProducersToAudit = 
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+
+    private final LRUCache<String,ActiveMQMessageAudit> dedicatedMessageAudits 
= new LRUCache<>(10_000);
 
     public ActiveMQDestination getDeadLetterQueueFor(Message message, 
Subscription subscription) {
         if (message.getDestination().isQueue()) {
@@ -51,6 +57,13 @@ public class IndividualDeadLetterStrategy extends 
AbstractDeadLetterStrategy {
 
     // Properties
     // 
-------------------------------------------------------------------------
+    public int getMaxDestinationsToAudit() {
+        return dedicatedMessageAudits.getMaxCacheSize();
+    }
+
+    public void maxDestinationsToAudit(int maxDestinationsToAudit) {
+        this.dedicatedMessageAudits.setMaxCacheSize(maxDestinationsToAudit);
+    }
 
     public String getQueuePrefix() {
         return queuePrefix;
@@ -134,6 +147,26 @@ public class IndividualDeadLetterStrategy extends 
AbstractDeadLetterStrategy {
         this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
     }
 
+    @Override
+    public int getMaxProducersToAudit() {
+        return this.maxProducersToAudit;
+    }
+
+    @Override
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        this.maxProducersToAudit = maxProducersToAudit;
+    }
+
+    @Override
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        this.maxAuditDepth = maxAuditDepth;
+    }
+
+    @Override
+    public int getMaxAuditDepth() {
+        return this.maxAuditDepth;
+    }
+
     // Implementation methods
     // 
-------------------------------------------------------------------------
     protected ActiveMQDestination createDestination(Message message,
@@ -168,4 +201,19 @@ public class IndividualDeadLetterStrategy extends 
AbstractDeadLetterStrategy {
         }
     }
 
+    @Override
+    protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) 
{
+        ActiveMQMessageAudit messageAudit;
+
+        synchronized(dedicatedMessageAudits) {
+            messageAudit = 
dedicatedMessageAudits.get(message.getDestination().getQualifiedName());
+
+            if(messageAudit == null) {
+                messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), 
getMaxProducersToAudit());
+                
dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), 
messageAudit);
+            }
+
+            return messageAudit;
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
index 41f1f1028..8a78e83cf 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -35,6 +36,7 @@ public class SharedDeadLetterStrategy extends 
AbstractDeadLetterStrategy {
     public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ";
 
     private ActiveMQDestination deadLetterQueue = new 
ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
+    private final ActiveMQMessageAudit messageAudit = new 
ActiveMQMessageAudit();
 
     public ActiveMQDestination getDeadLetterQueueFor(Message message, 
Subscription subscription) {
         return deadLetterQueue;
@@ -48,4 +50,29 @@ public class SharedDeadLetterStrategy extends 
AbstractDeadLetterStrategy {
         this.deadLetterQueue = deadLetterQueue;
     }
 
+    @Override
+    public int getMaxProducersToAudit() {
+        return messageAudit.getMaximumNumberOfProducersToTrack();
+    }
+
+    @Override
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+        messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
+    }
+
+    @Override
+    public void setMaxAuditDepth(int maxAuditDepth) {
+        messageAudit.setAuditDepth(maxAuditDepth);
+    }
+
+    @Override
+    public int getMaxAuditDepth() {
+        return messageAudit.getAuditDepth();
+    }
+
+    @Override
+    protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) 
{
+        return messageAudit;
+    }
+
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
index 5dc4ae7a9..2b51ffa95 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
@@ -16,22 +16,31 @@
  */
 package org.apache.activemq.broker.policy;
 
+import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.Set;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,12 +57,37 @@ public class IndividualDeadLetterTest extends 
DeadLetterTest {
         strategy.setProcessNonPersistent(true);
         policy.setDeadLetterStrategy(strategy);
 
+        PolicyEntry indvAuditPolicy = new PolicyEntry();
+        IndividualDeadLetterStrategy indvAuditDlqStrategy = new 
IndividualDeadLetterStrategy();
+        indvAuditDlqStrategy.setEnableAudit(true);
+        indvAuditPolicy.setDeadLetterStrategy(indvAuditDlqStrategy);
+
+        PolicyEntry shrAuditPolicy = new PolicyEntry();
+        SharedDeadLetterStrategy shrAuditDlqStrategy = new 
SharedDeadLetterStrategy();
+        shrAuditDlqStrategy.setEnableAudit(true);
+        shrAuditPolicy.setDeadLetterStrategy(shrAuditDlqStrategy);
+
         PolicyMap pMap = new PolicyMap();
         pMap.put(new ActiveMQQueue(getDestinationString()), policy);
         pMap.put(new ActiveMQTopic(getDestinationString()), policy);
+        pMap.put(new ActiveMQQueue(getDestinationString() + ".INDV.>"), 
indvAuditPolicy);
+        pMap.put(new ActiveMQQueue(getDestinationString() + ".SHR.>"), 
shrAuditPolicy);
 
         broker.setDestinationPolicy(pMap);
 
+        CompositeQueue indvAuditCompQueue = new CompositeQueue();
+        indvAuditCompQueue.setName(getDestinationString() + ".INDV.A");
+        indvAuditCompQueue.setForwardOnly(true);
+        indvAuditCompQueue.setForwardTo(Arrays.asList(new 
ActiveMQQueue(getDestinationString() + ".INDV.B"), new 
ActiveMQQueue(getDestinationString() + ".INDV.C")));
+
+        CompositeQueue sharedAuditCompQueue = new CompositeQueue();
+        sharedAuditCompQueue.setName(getDestinationString() + ".SHR.A");
+        sharedAuditCompQueue.setForwardOnly(true);
+        sharedAuditCompQueue.setForwardTo(Arrays.asList(new 
ActiveMQQueue(getDestinationString() + ".SHR.B"), new 
ActiveMQQueue(getDestinationString() + ".SHR.C")));
+
+        VirtualDestinationInterceptor vdi = new 
VirtualDestinationInterceptor();
+        vdi.setVirtualDestinations(new VirtualDestination[] { 
indvAuditCompQueue, sharedAuditCompQueue });
+        broker.setDestinationInterceptors(new VirtualDestinationInterceptor[] 
{vdi});
         return broker;
     }
 
@@ -99,6 +133,99 @@ public class IndividualDeadLetterTest extends 
DeadLetterTest {
         assertNull("The message shouldn't be sent to another DLQ", 
testConsumer.receive(1000));
     }
 
+    // AMQ-9217
+    public void testPerDestinationAuditDefault() throws Exception {
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = 
amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+
+        connection.start();
+        session = connection.createSession(transactedMode, acknowledgeMode);
+        MessageProducer messageProducerA = 
session.createProducer(session.createQueue(getDestinationString() + ".INDV.A"));
+        
messageProducerA.send(session.createTextMessage("testPerDestinationAuditEnabled"));
+        session.commit();
+
+        for(String destName : Set.of(getDestinationString() + ".INDV.B", 
getDestinationString() + ".INDV.C")) {
+            for (int i = 0; i < rollbackCount; i++) {
+                MessageConsumer indvConsumer = 
session.createConsumer(session.createQueue(destName));
+                Message message = indvConsumer.receive(5000);
+                assertNotNull("No message received: ", message);
+
+                session.rollback();
+                LOG.info("Rolled back: " + rollbackCount + " times");
+                indvConsumer.close();
+            }
+        }
+
+        QueueViewMBean a = getProxyToQueue(getDestinationString() + ".INDV.A");
+        assertNotNull(a);
+        assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250));
+        assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250));
+
+        QueueViewMBean b = getProxyToQueue(getDestinationString() + ".INDV.B");
+        assertNotNull(b);
+        assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250));
+        assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250));
+
+        QueueViewMBean c = getProxyToQueue(getDestinationString() + ".INDV.C");
+        assertNotNull(c);
+        assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250));
+        assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250));
+
+        QueueViewMBean bDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + 
getDestinationString() + ".INDV.B");
+        assertNotNull(bDlq);
+        assertTrue(Wait.waitFor(() -> bDlq.getEnqueueCount() == 1l, 3000, 
250));
+        assertTrue(Wait.waitFor(() -> bDlq.getQueueSize() == 1l, 3000, 250));
+
+        QueueViewMBean cDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + 
getDestinationString() + ".INDV.C");
+        assertNotNull(cDlq);
+        assertTrue(Wait.waitFor(() -> cDlq.getEnqueueCount() == 1, 3000, 250));
+        assertTrue(Wait.waitFor(() -> cDlq.getQueueSize() == 1, 3000, 250));
+    }
+
+    public void testSharedDestinationAuditDropsMessages() throws Exception {
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = 
amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+
+        connection.start();
+        session = connection.createSession(transactedMode, acknowledgeMode);
+        MessageProducer messageProducerA = 
session.createProducer(session.createQueue(getDestinationString() + ".SHR.A"));
+        
messageProducerA.send(session.createTextMessage("testSharedDestinationAuditDropsMessages"));
+        session.commit();
+
+        for(String destName : Set.of(getDestinationString() + ".SHR.B", 
getDestinationString() + ".SHR.C")) {
+            for (int i = 0; i < rollbackCount; i++) {
+                MessageConsumer shrConsumer = 
session.createConsumer(session.createQueue(destName));
+                Message message = shrConsumer.receive(5000);
+                assertNotNull("No message received: ", message);
+
+                session.rollback();
+                LOG.info("Rolled back: " + rollbackCount + " times");
+                shrConsumer.close();
+            }
+        }
+
+        QueueViewMBean a = getProxyToQueue(getDestinationString() + ".SHR.A");
+        assertNotNull(a);
+        assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250));
+        assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250));
+
+        QueueViewMBean b = getProxyToQueue(getDestinationString() + ".SHR.B");
+        assertNotNull(b);
+        assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250));
+        assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250));
+
+        QueueViewMBean c = getProxyToQueue(getDestinationString() + ".SHR.C");
+        assertNotNull(c);
+        assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250));
+        assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250));
+
+        // Only 1 message in 1 DLQ means the a message was dropped due to 
shared message audit
+        QueueViewMBean sharedDlq = getProxyToQueue("ActiveMQ.DLQ");
+        assertNotNull(sharedDlq);
+        assertTrue(Wait.waitFor(() -> sharedDlq.getEnqueueCount() == 1, 3000, 
250));
+        assertTrue(Wait.waitFor(() -> sharedDlq.getQueueSize() == 1, 3000, 
250));
+    }
+
     protected void browseDlq() throws Exception {
         Enumeration<?> messages = dlqBrowser.getEnumeration();
         while (messages.hasMoreElements()) {

Reply via email to