This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new c0892594cb Fire message discarded advisory on format errors (#2177)
c0892594cb is described below
commit c0892594cb3edfab2dd42ad6a6aef9c6f0cdf11e
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jul 2 16:05:11 2026 -0400
Fire message discarded advisory on format errors (#2177)
In #2136 the broker was updated to handle corrupt messages and to send
them to the DLQ and discarding them. This change further improves on
that by sending the message discarded advisory, if it has been enabled
on the destination. By default the discard advisory will be off.
---
.../activemq/broker/TransportConnection.java | 12 ++++++++++
.../activemq/broker/region/BaseDestination.java | 11 ++++++---
.../org/apache/activemq/broker/region/Queue.java | 8 ++++---
...ActiveMQMessageFormatExceptionSelectorTest.java | 12 ++++++++++
.../activemq/command/MessageCompressionTest.java | 26 ++++++++++++++++++++--
5 files changed, 61 insertions(+), 8 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b736008857..26a14c8cfe 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
@@ -1051,6 +1052,17 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
ack.setDestination(messageDispatch.getDestination());
ack.setMessageID(messageDispatch.getMessage().getMessageId());
broker.acknowledge(consumerExchange, ack);
+
+ // Send discarded advisory (if enabled). This calls directly
on the broker
+ // and not messageDiscarded() on the destination itself so
that it will
+ // not send to the DLQ, because that is eventually handled by
the subs
+ // when acking with a poison ack above
+ final Message.MessageDestination dest =
messageDispatch.getMessage()
+ .getRegionDestination();
+ if (dest instanceof BaseDestination && ((BaseDestination)
dest).isAdvisoryForDiscardingMessages()) {
+
broker.messageDiscarded(consumerExchange.getConnectionContext(),
+ consumerExchange.getSubscription(),
messageDispatch.getMessage());
+ }
}
} catch (Exception ex) {
TRANSPORTLOG.warn("{} could not acknowledge and send message to
the DLQ after"
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 3588902493..3b461ed649 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -576,15 +576,20 @@ public abstract class BaseDestination implements
Destination {
*/
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) {
+ final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
+ final String poisonCause = info != null ? "Subscription discard. ID:"
+ info.getConsumerId() : "Message discarded";
+ messageDiscarded(context, sub, messageReference, new
Throwable(poisonCause));
+ }
+
+ protected void messageDiscarded(ConnectionContext context, Subscription
sub, MessageReference messageReference,
+ Throwable cause) {
if (advisoryForDiscardingMessages) {
broker.messageDiscarded(context, sub, messageReference);
}
// We need to send to the DLQ because broker.messageDiscarded() will
not do that because it's
// optionally enabled and off by default. This is different than
expiration handling because
// broker.messageExpired() does send to the DLQ
- final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
- final String poisonCause = info != null ? "Subscription discard. ID:"
+ info.getConsumerId() : "Message discarded";
- broker.sendToDeadLetterQueue(context, messageReference, sub, new
Throwable(poisonCause));
+ broker.sendToDeadLetterQueue(context, messageReference, sub, cause);
}
/**
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 074e2450ee..0f13bd0985 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1895,14 +1895,16 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
}
- protected void removeAndSendToDlq(ConnectionContext c,
QueueMessageReference r, Exception e) throws IOException {
+ private void discardAndSendToDlq(ConnectionContext c,
QueueMessageReference r, Exception e) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.POISON_ACK_TYPE);
ack.setPoisonCause(e);
ack.setDestination(destination);
ack.setMessageID(r.getMessageId());
removeMessage(c, null, r, ack);
- broker.getRoot().sendToDeadLetterQueue(c, r.getMessage(), null, e);
+ // this.messageDiscarded() sends to the DLQ with the poison cause
+ // as well as sending the discarded advisory (if enabled).
+ this.messageDiscarded(c, null, r, e);
}
protected void removeMessage(ConnectionContext c, Subscription subs,
QueueMessageReference r) throws IOException {
@@ -2421,7 +2423,7 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
throws IOException {
if (messageFormatErrors != null) {
for (Entry<QueueMessageReference, ActiveMQMessageFormatException>
error : messageFormatErrors.entrySet()) {
- removeAndSendToDlq(broker.getAdminConnectionContext(),
error.getKey(),
+ discardAndSendToDlq(broker.getAdminConnectionContext(),
error.getKey(),
error.getValue());
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
index d68701e1be..72f4c1eb39 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
@@ -70,10 +70,12 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
private URI clientUri;
private BrokerService brokerService;
private final AtomicInteger dlqCount = new AtomicInteger();
+ private final AtomicInteger discardCount = new AtomicInteger();
@Before
public void setUp() throws Exception {
dlqCount.set(0);
+ discardCount.set(0);
startBroker();
}
@@ -128,6 +130,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
assertEquals(100,
destination.getDestinationStatistics().getMessages().getCount());
assertTrue(destination.getMemoryUsage().getUsage() > 0);
assertEquals(0, dlqCount.get());
+ assertEquals(0, discardCount.get());
assertEquals(100, destination.getMessageStore().getMessageCount());
}
}
@@ -140,6 +143,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
// corrupted it makes sense to just remove with the first error and
// send to the DLQ
assertEquals(20, dlqCount.get());
+ assertEquals(20, discardCount.get());
}
@Test(timeout = 30000)
@@ -361,6 +365,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
brokerService = new BrokerService();
PolicyEntry policy = new PolicyEntry();
policy.setSendAdvisoryIfNoConsumers(true);
+ policy.setAdvisoryForDiscardingMessages(true);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
@@ -383,6 +388,13 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
return super.sendToDeadLetterQueue(context,
messageReference,
subscription, poisonCause);
}
+
+ @Override
+ public void messageDiscarded(ConnectionContext context,
Subscription sub,
+ MessageReference messageReference) {
+ discardCount.getAndIncrement();
+ super.messageDiscarded(context, sub, messageReference);
+ }
};
}
}});
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index e9ec549b57..b08def940b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -43,6 +43,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.test.annotations.ParallelTest;
import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
import
org.apache.activemq.util.MarshallingSupport.MaxInflatedDataSizeExceededException;
@@ -72,13 +74,20 @@ public class MessageCompressionTest {
private String connectionUri;
private final AtomicBoolean throwMaxInflatedException = new
AtomicBoolean(false);
private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+ private final AtomicBoolean discarded = new AtomicBoolean(false);
@Before
public void setUp() throws Exception {
throwMaxInflatedException.set(false);
sentToDlq.set(false);
+ discarded.set(false);
broker = new BrokerService();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setAdvisoryForDiscardingMessages(true);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
@Override
public Broker installPlugin(Broker broker) {
@@ -105,6 +114,13 @@ public class MessageCompressionTest {
return super.sendToDeadLetterQueue(context,
messageReference,
subscription, poisonCause);
}
+
+ @Override
+ public void messageDiscarded(ConnectionContext context,
Subscription sub,
+ MessageReference messageReference) {
+ discarded.set(true);
+ super.messageDiscarded(context, sub, messageReference);
+ }
};
}
}});
@@ -183,10 +199,13 @@ public class MessageCompressionTest {
assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
.getDestinationStatistics().getMessages().getCount() == 1,
1000, 10));
assertFalse(sentToDlq.get());
+ assertFalse(discarded.get());
- // simulate a decompression error
+ // simulate a decompression error. This should cause an error on
+ // dispatch inside TransportConnection
// this should poison ack and DLQ and we shouldn't get the message
- // but the connection should still be open
+ // but the connection should still be open. The message will also call
+ // the discard callback because advisoryForDiscardingMessages is
enabled
this.throwMaxInflatedException.set(true);
ActiveMQConnection con2 = (ActiveMQConnection)
factory.createConnection();
@@ -199,15 +218,18 @@ public class MessageCompressionTest {
assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
.getDestinationStatistics().getMessages().getCount() == 0,
500, 10));
assertTrue(sentToDlq.get());
+ assertTrue(discarded.get());
// no longer throw an exception
this.throwMaxInflatedException.set(false);
sentToDlq.set(false);
+ discarded.set(false);
// exception has been disabled so we should receive again on the same
connection
producer.send(bytesMessage);
assertNotNull(consumer.receive(1000));
assertFalse(sentToDlq.get());
+ assertFalse(discarded.get());
con1.close();
con2.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact