This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new 1f1ebbc83d Fire message discarded advisory on format errors (#2177) 
(#2180)
1f1ebbc83d is described below

commit 1f1ebbc83dfb5afa8091a91c2bc7abf0992555c1
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jul 2 17:11:40 2026 -0400

    Fire message discarded advisory on format errors (#2177) (#2180)
    
    In #2136 the broker was updated to handle corrupt messages and to send
    them to the DLQ and discarding them. This change further improves on
    that by sending the message discarded advisory, if it has been enabled
    on the destination. By default the discard advisory will be off.
    
    (cherry picked from commit c0892594cb3edfab2dd42ad6a6aef9c6f0cdf11e)
---
 .../activemq/broker/TransportConnection.java       | 12 ++++++++++
 .../activemq/broker/region/BaseDestination.java    | 11 ++++++---
 .../org/apache/activemq/broker/region/Queue.java   |  8 ++++---
 ...ActiveMQMessageFormatExceptionSelectorTest.java | 12 ++++++++++
 .../activemq/command/MessageCompressionTest.java   | 26 ++++++++++++++++++++--
 5 files changed, 61 insertions(+), 8 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 257083df91..116a2554d4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -1048,6 +1049,17 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 ack.setDestination(messageDispatch.getDestination());
                 ack.setMessageID(messageDispatch.getMessage().getMessageId());
                 broker.acknowledge(consumerExchange, ack);
+
+                // Send discarded advisory (if enabled). This calls directly 
on the broker
+                // and not messageDiscarded() on the destination itself so 
that it will
+                // not send to the DLQ, because that is eventually handled by 
the subs
+                // when acking with a poison ack above
+                final Message.MessageDestination dest = 
messageDispatch.getMessage()
+                        .getRegionDestination();
+                if (dest instanceof BaseDestination && ((BaseDestination) 
dest).isAdvisoryForDiscardingMessages()) {
+                    
broker.messageDiscarded(consumerExchange.getConnectionContext(),
+                            consumerExchange.getSubscription(), 
messageDispatch.getMessage());
+                }
             }
         } catch (Exception ex) {
             TRANSPORTLOG.warn("{} could not acknowledge and send message to 
the DLQ after"
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 9e09e15bb0..192d1d0fcb 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -576,15 +576,20 @@ public abstract class BaseDestination implements 
Destination {
      */
     @Override
     public void messageDiscarded(ConnectionContext context, Subscription sub, 
MessageReference messageReference) {
+        final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
+        final String poisonCause = info != null ? "Subscription discard. ID:" 
+ info.getConsumerId() : "Message discarded";
+        messageDiscarded(context, sub, messageReference, new 
Throwable(poisonCause));
+    }
+
+    protected void messageDiscarded(ConnectionContext context, Subscription 
sub, MessageReference messageReference,
+            Throwable cause) {
         if (advisoryForDiscardingMessages) {
             broker.messageDiscarded(context, sub, messageReference);
         }
         // We need to send to the DLQ because broker.messageDiscarded() will 
not do that because it's
         // optionally enabled and off by default. This is different than 
expiration handling because
         // broker.messageExpired() does send to the DLQ
-        final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
-        final String poisonCause = info != null ? "Subscription discard. ID:" 
+ info.getConsumerId() : "Message discarded";
-        broker.sendToDeadLetterQueue(context, messageReference, sub, new 
Throwable(poisonCause));
+        broker.sendToDeadLetterQueue(context, messageReference, sub, cause);
     }
 
     /**
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6bb9f67b54..5f0a86cf62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1888,14 +1888,16 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         }
     }
 
-    protected void removeAndSendToDlq(ConnectionContext c, 
QueueMessageReference r, Exception e) throws IOException {
+    private void discardAndSendToDlq(ConnectionContext c, 
QueueMessageReference r, Exception e) throws IOException {
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.POISON_ACK_TYPE);
         ack.setPoisonCause(e);
         ack.setDestination(destination);
         ack.setMessageID(r.getMessageId());
         removeMessage(c, null, r, ack);
-        broker.getRoot().sendToDeadLetterQueue(c, r.getMessage(), null, e);
+        // this.messageDiscarded() sends to the DLQ with the poison cause
+        // as well as sending the discarded advisory (if enabled).
+        this.messageDiscarded(c, null, r, e);
     }
 
     protected void removeMessage(ConnectionContext c, Subscription subs, 
QueueMessageReference r) throws IOException {
@@ -2414,7 +2416,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             throws IOException {
         if (messageFormatErrors != null) {
             for (Entry<QueueMessageReference, ActiveMQMessageFormatException> 
error : messageFormatErrors.entrySet()) {
-                removeAndSendToDlq(broker.getAdminConnectionContext(), 
error.getKey(),
+                discardAndSendToDlq(broker.getAdminConnectionContext(), 
error.getKey(),
                         error.getValue());
             }
         }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
index a12d858e71..4bb27574a4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
@@ -67,10 +67,12 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
     private URI clientUri;
     private BrokerService brokerService;
     private final AtomicInteger dlqCount = new AtomicInteger();
+    private final AtomicInteger discardCount = new AtomicInteger();
 
     @Before
     public void setUp() throws Exception {
         dlqCount.set(0);
+        discardCount.set(0);
         startBroker();
     }
 
@@ -125,6 +127,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
             assertEquals(100, 
destination.getDestinationStatistics().getMessages().getCount());
             assertTrue(destination.getMemoryUsage().getUsage() > 0);
             assertEquals(0, dlqCount.get());
+            assertEquals(0, discardCount.get());
             assertEquals(100, destination.getMessageStore().getMessageCount());
         }
     }
@@ -137,6 +140,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
         // corrupted it makes sense to just remove with the first error and
         // send to the DLQ
         assertEquals(20, dlqCount.get());
+        assertEquals(20, discardCount.get());
     }
 
     @Test(timeout = 30000)
@@ -358,6 +362,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
         brokerService = new BrokerService();
         PolicyEntry policy = new PolicyEntry();
         policy.setSendAdvisoryIfNoConsumers(true);
+        policy.setAdvisoryForDiscardingMessages(true);
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         brokerService.setDestinationPolicy(pMap);
@@ -380,6 +385,13 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
                         return super.sendToDeadLetterQueue(context, 
messageReference,
                                 subscription, poisonCause);
                     }
+
+                    @Override
+                    public void messageDiscarded(ConnectionContext context, 
Subscription sub,
+                            MessageReference messageReference) {
+                        discardCount.getAndIncrement();
+                        super.messageDiscarded(context, sub, messageReference);
+                    }
                 };
             }
         }});
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index 900e827915..1f67efb52b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -43,6 +43,8 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 import 
org.apache.activemq.util.MarshallingSupport.MaxInflatedDataSizeExceededException;
 import org.apache.activemq.util.Wait;
@@ -69,13 +71,20 @@ public class MessageCompressionTest {
     private String connectionUri;
     private final AtomicBoolean throwMaxInflatedException = new 
AtomicBoolean(false);
     private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+    private final AtomicBoolean discarded = new AtomicBoolean(false);
 
     @Before
     public void setUp() throws Exception {
         throwMaxInflatedException.set(false);
         sentToDlq.set(false);
+        discarded.set(false);
 
         broker = new BrokerService();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setAdvisoryForDiscardingMessages(true);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
         broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
             @Override
             public Broker installPlugin(Broker broker) {
@@ -102,6 +111,13 @@ public class MessageCompressionTest {
                         return super.sendToDeadLetterQueue(context, 
messageReference,
                                 subscription, poisonCause);
                     }
+
+                    @Override
+                    public void messageDiscarded(ConnectionContext context, 
Subscription sub,
+                            MessageReference messageReference) {
+                        discarded.set(true);
+                        super.messageDiscarded(context, sub, messageReference);
+                    }
                 };
             }
         }});
@@ -180,10 +196,13 @@ public class MessageCompressionTest {
         assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
                 .getDestinationStatistics().getMessages().getCount() == 1, 
1000, 10));
         assertFalse(sentToDlq.get());
+        assertFalse(discarded.get());
 
-        // simulate a decompression error
+        // simulate a decompression error. This should cause an error on
+        // dispatch inside TransportConnection
         // this should poison ack and DLQ and we shouldn't get the message
-        // but the connection should still be open
+        // but the connection should still be open. The message will also call
+        // the discard callback because advisoryForDiscardingMessages is 
enabled
         this.throwMaxInflatedException.set(true);
 
         ActiveMQConnection con2 = (ActiveMQConnection) 
factory.createConnection();
@@ -196,15 +215,18 @@ public class MessageCompressionTest {
         assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
                 .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
         assertTrue(sentToDlq.get());
+        assertTrue(discarded.get());
 
         // no longer throw an exception
         this.throwMaxInflatedException.set(false);
         sentToDlq.set(false);
+        discarded.set(false);
 
         // exception has been disabled so we should receive again on the same 
connection
         producer.send(bytesMessage);
         assertNotNull(consumer.receive(1000));
         assertFalse(sentToDlq.get());
+        assertFalse(discarded.get());
 
         con1.close();
         con2.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to