This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new 1f1ebbc83d Fire message discarded advisory on format errors (#2177)
(#2180)
1f1ebbc83d is described below
commit 1f1ebbc83dfb5afa8091a91c2bc7abf0992555c1
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jul 2 17:11:40 2026 -0400
Fire message discarded advisory on format errors (#2177) (#2180)
In #2136 the broker was updated to handle corrupt messages and to send
them to the DLQ and discarding them. This change further improves on
that by sending the message discarded advisory, if it has been enabled
on the destination. By default the discard advisory will be off.
(cherry picked from commit c0892594cb3edfab2dd42ad6a6aef9c6f0cdf11e)
---
.../activemq/broker/TransportConnection.java | 12 ++++++++++
.../activemq/broker/region/BaseDestination.java | 11 ++++++---
.../org/apache/activemq/broker/region/Queue.java | 8 ++++---
...ActiveMQMessageFormatExceptionSelectorTest.java | 12 ++++++++++
.../activemq/command/MessageCompressionTest.java | 26 ++++++++++++++++++++--
5 files changed, 61 insertions(+), 8 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 257083df91..116a2554d4 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
@@ -1048,6 +1049,17 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
ack.setDestination(messageDispatch.getDestination());
ack.setMessageID(messageDispatch.getMessage().getMessageId());
broker.acknowledge(consumerExchange, ack);
+
+ // Send discarded advisory (if enabled). This calls directly
on the broker
+ // and not messageDiscarded() on the destination itself so
that it will
+ // not send to the DLQ, because that is eventually handled by
the subs
+ // when acking with a poison ack above
+ final Message.MessageDestination dest =
messageDispatch.getMessage()
+ .getRegionDestination();
+ if (dest instanceof BaseDestination && ((BaseDestination)
dest).isAdvisoryForDiscardingMessages()) {
+
broker.messageDiscarded(consumerExchange.getConnectionContext(),
+ consumerExchange.getSubscription(),
messageDispatch.getMessage());
+ }
}
} catch (Exception ex) {
TRANSPORTLOG.warn("{} could not acknowledge and send message to
the DLQ after"
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 9e09e15bb0..192d1d0fcb 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -576,15 +576,20 @@ public abstract class BaseDestination implements
Destination {
*/
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub,
MessageReference messageReference) {
+ final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
+ final String poisonCause = info != null ? "Subscription discard. ID:"
+ info.getConsumerId() : "Message discarded";
+ messageDiscarded(context, sub, messageReference, new
Throwable(poisonCause));
+ }
+
+ protected void messageDiscarded(ConnectionContext context, Subscription
sub, MessageReference messageReference,
+ Throwable cause) {
if (advisoryForDiscardingMessages) {
broker.messageDiscarded(context, sub, messageReference);
}
// We need to send to the DLQ because broker.messageDiscarded() will
not do that because it's
// optionally enabled and off by default. This is different than
expiration handling because
// broker.messageExpired() does send to the DLQ
- final ConsumerInfo info = sub != null ? sub.getConsumerInfo() : null;
- final String poisonCause = info != null ? "Subscription discard. ID:"
+ info.getConsumerId() : "Message discarded";
- broker.sendToDeadLetterQueue(context, messageReference, sub, new
Throwable(poisonCause));
+ broker.sendToDeadLetterQueue(context, messageReference, sub, cause);
}
/**
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6bb9f67b54..5f0a86cf62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1888,14 +1888,16 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
}
}
- protected void removeAndSendToDlq(ConnectionContext c,
QueueMessageReference r, Exception e) throws IOException {
+ private void discardAndSendToDlq(ConnectionContext c,
QueueMessageReference r, Exception e) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.POISON_ACK_TYPE);
ack.setPoisonCause(e);
ack.setDestination(destination);
ack.setMessageID(r.getMessageId());
removeMessage(c, null, r, ack);
- broker.getRoot().sendToDeadLetterQueue(c, r.getMessage(), null, e);
+ // this.messageDiscarded() sends to the DLQ with the poison cause
+ // as well as sending the discarded advisory (if enabled).
+ this.messageDiscarded(c, null, r, e);
}
protected void removeMessage(ConnectionContext c, Subscription subs,
QueueMessageReference r) throws IOException {
@@ -2414,7 +2416,7 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
throws IOException {
if (messageFormatErrors != null) {
for (Entry<QueueMessageReference, ActiveMQMessageFormatException>
error : messageFormatErrors.entrySet()) {
- removeAndSendToDlq(broker.getAdminConnectionContext(),
error.getKey(),
+ discardAndSendToDlq(broker.getAdminConnectionContext(),
error.getKey(),
error.getValue());
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
index a12d858e71..4bb27574a4 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
@@ -67,10 +67,12 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
private URI clientUri;
private BrokerService brokerService;
private final AtomicInteger dlqCount = new AtomicInteger();
+ private final AtomicInteger discardCount = new AtomicInteger();
@Before
public void setUp() throws Exception {
dlqCount.set(0);
+ discardCount.set(0);
startBroker();
}
@@ -125,6 +127,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
assertEquals(100,
destination.getDestinationStatistics().getMessages().getCount());
assertTrue(destination.getMemoryUsage().getUsage() > 0);
assertEquals(0, dlqCount.get());
+ assertEquals(0, discardCount.get());
assertEquals(100, destination.getMessageStore().getMessageCount());
}
}
@@ -137,6 +140,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
// corrupted it makes sense to just remove with the first error and
// send to the DLQ
assertEquals(20, dlqCount.get());
+ assertEquals(20, discardCount.get());
}
@Test(timeout = 30000)
@@ -358,6 +362,7 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
brokerService = new BrokerService();
PolicyEntry policy = new PolicyEntry();
policy.setSendAdvisoryIfNoConsumers(true);
+ policy.setAdvisoryForDiscardingMessages(true);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
@@ -380,6 +385,13 @@ public class ActiveMQMessageFormatExceptionSelectorTest {
return super.sendToDeadLetterQueue(context,
messageReference,
subscription, poisonCause);
}
+
+ @Override
+ public void messageDiscarded(ConnectionContext context,
Subscription sub,
+ MessageReference messageReference) {
+ discardCount.getAndIncrement();
+ super.messageDiscarded(context, sub, messageReference);
+ }
};
}
}});
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index 900e827915..1f67efb52b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -43,6 +43,8 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
import
org.apache.activemq.util.MarshallingSupport.MaxInflatedDataSizeExceededException;
import org.apache.activemq.util.Wait;
@@ -69,13 +71,20 @@ public class MessageCompressionTest {
private String connectionUri;
private final AtomicBoolean throwMaxInflatedException = new
AtomicBoolean(false);
private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+ private final AtomicBoolean discarded = new AtomicBoolean(false);
@Before
public void setUp() throws Exception {
throwMaxInflatedException.set(false);
sentToDlq.set(false);
+ discarded.set(false);
broker = new BrokerService();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setAdvisoryForDiscardingMessages(true);
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(pMap);
broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
@Override
public Broker installPlugin(Broker broker) {
@@ -102,6 +111,13 @@ public class MessageCompressionTest {
return super.sendToDeadLetterQueue(context,
messageReference,
subscription, poisonCause);
}
+
+ @Override
+ public void messageDiscarded(ConnectionContext context,
Subscription sub,
+ MessageReference messageReference) {
+ discarded.set(true);
+ super.messageDiscarded(context, sub, messageReference);
+ }
};
}
}});
@@ -180,10 +196,13 @@ public class MessageCompressionTest {
assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
.getDestinationStatistics().getMessages().getCount() == 1,
1000, 10));
assertFalse(sentToDlq.get());
+ assertFalse(discarded.get());
- // simulate a decompression error
+ // simulate a decompression error. This should cause an error on
+ // dispatch inside TransportConnection
// this should poison ack and DLQ and we shouldn't get the message
- // but the connection should still be open
+ // but the connection should still be open. The message will also call
+ // the discard callback because advisoryForDiscardingMessages is
enabled
this.throwMaxInflatedException.set(true);
ActiveMQConnection con2 = (ActiveMQConnection)
factory.createConnection();
@@ -196,15 +215,18 @@ public class MessageCompressionTest {
assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
.getDestinationStatistics().getMessages().getCount() == 0,
500, 10));
assertTrue(sentToDlq.get());
+ assertTrue(discarded.get());
// no longer throw an exception
this.throwMaxInflatedException.set(false);
sentToDlq.set(false);
+ discarded.set(false);
// exception has been disabled so we should receive again on the same
connection
producer.send(bytesMessage);
assertNotNull(consumer.receive(1000));
assertFalse(sentToDlq.get());
+ assertFalse(discarded.get());
con1.close();
con2.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact