This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new b20e61c7ef Improve the broker's handling of message corruption (#2136)
b20e61c7ef is described below

commit b20e61c7ef736a04ad07a934ea8de628550c6cf2
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jun 22 20:14:14 2026 -0400

    Improve the broker's handling of message corruption (#2136)
    
    This commit improves the brokers handling of messages that are corrupt
    and can't be read, such as unmarshaling the properties or body.
    Currently if there is an error an IOException is triggered and can lead
    to a client connection be closed. Furthermore for queues messages can be
    stuck and no new messages can be delivered.
    
    To improve things the following changes have been made:
    
    * The MarshallingSupport utility that is used to unmarshal message
      properties and bodies has improve validation to check for errors
      such is incorrectly encoded size values.
    * The broker will now handle message format errors both when messages
      are evaluated to add to subscriptions and during dispatch to consumers
      when the messages are already on a subscription.
    * The Stomp protocol converter was fixed to not auto ack or track acks
      until the message has been converted.
    * AMQP no longer swallows message format errors and will throw so the
      erors can be handled by the TransportConnection.
    
    All of these changes allow the broker to deal with corrupt messages and
    remove them (and possibly DLQ) vs causing the connections to close or
    the messages to block consumers forever in the queue case.
---
 .../transport/amqp/protocol/AmqpSender.java        |  13 +-
 .../transport/amqp/JMSInteroperabilityTest.java    | 172 ++++++++-
 .../activemq/broker/TransportConnection.java       |  49 ++-
 .../apache/activemq/broker/TransportConnector.java |  21 +-
 .../broker/region/AbstractSubscription.java        |  22 +-
 .../org/apache/activemq/broker/region/Queue.java   |  77 +++-
 .../broker/region/QueueBrowserSubscription.java    |   8 +
 .../activemq/broker/region/QueueSubscription.java  |  29 +-
 .../org/apache/activemq/broker/region/Topic.java   |  23 +-
 .../auto/nio/AutoNIOSSLTransportServer.java        |   3 +-
 .../activemq/ActiveMQMessageFormatException.java   |  26 ++
 .../activemq/command/ActiveMQMapMessage.java       |   2 +-
 .../activemq/command/ActiveMQStreamMessage.java    |   1 +
 .../activemq/command/ActiveMQTextMessage.java      |   2 +-
 .../FrameSizeLimitedFilterInputStream.java         |   3 +-
 .../org/apache/activemq/util/ExceptionUtils.java   |  96 +++++
 .../apache/activemq/util/MarshallingSupport.java   |  92 +++--
 .../activemq/util/MarshallingSupportTest.java      |   4 +-
 .../apache/activemq/transport/mqtt/MQTTTest.java   |  64 ++++
 .../transport/stomp/StompSubscription.java         |  30 +-
 .../apache/activemq/transport/stomp/StompTest.java |  67 ++++
 ...ActiveMQMessageFormatExceptionSelectorTest.java | 404 +++++++++++++++++++++
 .../apache/activemq/SyncSendPacketTimeoutTest.java |   3 +-
 .../activemq/command/ActiveMQMapMessageTest.java   |  33 ++
 .../activemq/command/ActiveMQMessageTest.java      |  23 ++
 .../command/ActiveMQObjectMessageTest.java         |  30 ++
 .../command/ActiveMQStreamMessageTest.java         |  30 ++
 .../activemq/command/ActiveMQTextMessageTest.java  |  30 ++
 28 files changed, 1266 insertions(+), 91 deletions(-)

diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 46846867ba..2fc8f4490f 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.ActiveMQMessageFormatException;
 import org.apache.activemq.broker.region.AbstractSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -41,6 +42,7 @@ import 
org.apache.activemq.transport.amqp.AmqpProtocolConverter;
 import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
 import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.activemq.transport.amqp.message.OutboundTransformer;
+import org.apache.activemq.util.ExceptionUtils;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
@@ -491,7 +493,16 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                     }
                 }
             } catch (Exception e) {
-                LOG.warn("Error detected while flushing outbound messages: 
{}", e.getMessage());
+                // Check if there is a format error trying to convert the 
message. This error means the
+                // message can't be converted (corruption, etc). This will 
wrap and throw the message
+                // so it can be handled by the transport
+                ActiveMQMessageFormatException formatError = 
ExceptionUtils.createMessageFormatException(e);
+                if (formatError != null) {
+                    LOG.warn("Message conversion error while flushing outbound 
messages: {}", e.getMessage(), e);
+                    throw formatError;
+                } else {
+                    LOG.warn("Error detected while flushing outbound messages: 
{}", e.getMessage());
+                }
             }
         }
     }
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 2a2faf33ca..8bad4f39c7 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -18,10 +18,12 @@ package org.apache.activemq.transport.amqp;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeFalse;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -30,20 +32,25 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import jakarta.jms.BytesMessage;
-import jakarta.jms.Connection;
-import jakarta.jms.Destination;
-import jakarta.jms.MapMessage;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.ObjectMessage;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
+import jakarta.jms.*;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.Wait;
 import org.apache.qpid.proton.amqp.Binary;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -59,6 +66,7 @@ public class JMSInteroperabilityTest extends 
JMSClientTestSupport {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(JMSInteroperabilityTest.class);
 
+    private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
     private final String transformer;
 
     @Parameters(name="Transformer->{0}")
@@ -70,6 +78,13 @@ public class JMSInteroperabilityTest extends 
JMSClientTestSupport {
             });
     }
 
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        sentToDlq.set(false);
+    }
+
     public JMSInteroperabilityTest(String transformer) {
         this.transformer = transformer;
     }
@@ -84,7 +99,26 @@ public class JMSInteroperabilityTest extends 
JMSClientTestSupport {
         return transformer;
     }
 
-    //----- Tests for property handling between protocols 
--------------------//
+    @Override
+    protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws 
Exception {
+        super.addAdditionalPlugins(plugins);
+        plugins.add(new BrokerPluginSupport() {
+            @Override
+            public Broker installPlugin(Broker broker) {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public boolean sendToDeadLetterQueue(ConnectionContext 
context,
+                                                         MessageReference 
messageReference, Subscription subscription,
+                                                         Throwable 
poisonCause) {
+                        sentToDlq.set(true);
+                        return super.sendToDeadLetterQueue(context, 
messageReference,
+                                subscription, poisonCause);
+                    }
+                };
+            }
+        });
+    }
+//----- Tests for property handling between protocols --------------------//
 
     @SuppressWarnings("unchecked")
     @Test(timeout = 60000)
@@ -462,6 +496,122 @@ public class JMSInteroperabilityTest extends 
JMSClientTestSupport {
         }
     }
 
+    // The following tests for corruption will corrupt the headers or body
+    // to test that the AMQP protocol correctly passes the error during
+    // dispatch to allow the Transport Connection to properly handle
+    // with a poison ack so the message will be removed from the subscription.
+    // No selectors are set so these messages are only going to error
+    // during the protocol conversion.
+
+    @Test
+    public void testCorruptMessageErrorHeaders() throws Exception {
+        testCorruptMessageError(session -> {
+            ActiveMQBytesMessage message = (ActiveMQBytesMessage) 
session.createBytesMessage();
+            message.setStringProperty("testestt", "Testestt");
+            message.setStringProperty("prop2", "Testestt");
+            try {
+                message.beforeMarshall(null);
+                
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return message;
+        }, false);
+    }
+
+    @Test
+    public void testCorruptMessageErrorMap() throws Exception {
+        testCorruptMessageError(session -> {
+            MapMessage message = session.createMapMessage();
+            message.setString("id", UUID.randomUUID().toString());
+            return message;
+        }, false);
+    }
+
+    // Check durable sub as well which is also a prefetch subscription so a
+    // poison ack will be handled the same way and DLQ
+    @Test
+    public void testCorruptMessageErrorMapDurableSub() throws Exception {
+        testCorruptMessageError(session -> {
+            MapMessage message = session.createMapMessage();
+            message.setString("id", UUID.randomUUID().toString());
+            return message;
+        }, true);
+    }
+
+    @Test
+    public void testCorruptMessageErrorText() throws Exception {
+        testCorruptMessageError(session -> {
+            TextMessage message = session.createTextMessage();
+            message.setText(UUID.randomUUID().toString());
+            return message;
+        }, false);
+    }
+
+    @Test
+    public void testCorruptMessageErrorTextDurableSub() throws Exception {
+        testCorruptMessageError(session -> {
+            TextMessage message = session.createTextMessage();
+            message.setText(UUID.randomUUID().toString());
+            return message;
+        }, true);
+    }
+
+
+    @Test
+    public void testCorruptMessageErrorStream() throws Exception {
+        testCorruptMessageError(session -> {
+            StreamMessage message = session.createStreamMessage();
+            message.writeBytes(UUID.randomUUID().toString().getBytes());
+            return message;
+        }, false);
+    }
+
+    private void testCorruptMessageError(MessageCreator messageCreator, 
boolean topic) throws Exception {
+        // Raw Transformer doesn't expand the body
+        assumeFalse(!transformer.equals("jms"));
+
+        try (Connection openwire = createJMSConnection(); Connection amqp = 
createConnection()) {
+            openwire.start();
+            amqp.start();
+            Session openwireSession = openwire.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Session amqpSession = amqp.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            ActiveMQDestination dest = topic ?
+                    (ActiveMQDestination)  
openwireSession.createTopic(getDestinationName()) :
+                    (ActiveMQDestination)  
openwireSession.createQueue(getDestinationName());
+            MessageProducer openwireProducer = 
openwireSession.createProducer(dest);
+            MessageConsumer amqpConsumer = topic ?
+                    amqpSession.createDurableSubscriber((Topic) dest, "sub") :
+                    amqpSession.createConsumer(dest);
+
+            // Create and send the Message
+            ActiveMQMessage outgoing = (ActiveMQMessage) 
messageCreator.create(openwireSession);
+            outgoing.storeContentAndClear();
+
+            // corrupt the buffer
+            // might be null if we are testing headers only
+            if (outgoing.getContent() != null && outgoing.getContent().length 
> 0) {
+                ByteSequenceData.writeIntBig(outgoing.getContent(), 1000);
+            }
+
+            openwireProducer.send(outgoing);
+
+            // Now try to consume the Message, should not be received
+            Message received = amqpConsumer.receive(2000);
+            assertNull(received);
+
+            // verify message is gone off the dest and went to the DLQ
+            assertTrue(Wait.waitFor(() -> brokerService.getDestination(dest)
+                    .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+            assertTrue(sentToDlq.get());
+        }
+    }
+
+    private interface MessageCreator {
+        Message create(Session session) throws JMSException;
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testObjectMessageContainingList() throws Exception {
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b6fe548857..b736008857 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -26,9 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -98,6 +96,8 @@ import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
+import org.apache.activemq.util.ExceptionUtils;
+import org.apache.activemq.ActiveMQMessageFormatException;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.NetworkBridgeUtils;
@@ -105,7 +105,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import jakarta.jms.ResourceAllocationException;
 
 public class TransportConnection implements Connection, Task, CommandVisitor {
     private static final Logger LOG = 
LoggerFactory.getLogger(TransportConnection.class);
@@ -996,6 +995,18 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 if (sub != null) {
                     sub.onFailure();
                 }
+                // Check if this is a type of message format error which 
indicates the
+                // message was corrupt and there was some problem 
unmarshaling. For these
+                // errors we can handle by acking with a poison ack (which 
will send to the DLQ
+                // if durable/queue sub) and remove them from the consumer so 
the consumer can
+                // continue. We do not want to throw the exception as that 
would close the connection.
+                ActiveMQMessageFormatException marshallingError = 
ExceptionUtils.createMessageFormatException(e);
+                if (marshallingError != null) {
+                    handleMessageFormatError(marshallingError, 
messageDispatch);
+                    // must set to null so when we return the finally block is 
skipped
+                    messageDispatch = null;
+                    return;
+                }
                 messageDispatch = null;
                 throw e;
             } else {
@@ -1015,6 +1026,38 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         }
     }
 
+    private void handleMessageFormatError(ActiveMQMessageFormatException e, 
MessageDispatch messageDispatch) {
+        if (TRANSPORTLOG.isDebugEnabled()) {
+            TRANSPORTLOG.debug("{} had an unexpected Message format error: 
{}", this, e.getMessage(), e);
+        } else if (TRANSPORTLOG.isWarnEnabled()) {
+            if (connector.isDisplayStackTrace()) {
+                TRANSPORTLOG.warn("{} had an unexpected Message format  
error", this, e);
+            } else {
+                TRANSPORTLOG.warn("{} had an unexpected Message format  error: 
{}", this, e.getMessage());
+            }
+        }
+
+        ConsumerBrokerExchange consumerExchange = 
getConsumerBrokerExchange(messageDispatch.getConsumerId());
+        try {
+            // acknowledge with the consumer exchange for this dispatch
+            // This should exist because this error happened during dispatch, 
but if for some
+            // reason it is null it should get handled when delivery is 
attempted again
+            if (consumerExchange != null) {
+                MessageAck ack = new MessageAck();
+                // Acking with a poison ack will send to the DLQ
+                ack.setAckType(MessageAck.POISON_ACK_TYPE);
+                ack.setPoisonCause(e);
+                ack.setConsumerId(messageDispatch.getConsumerId());
+                ack.setDestination(messageDispatch.getDestination());
+                ack.setMessageID(messageDispatch.getMessage().getMessageId());
+                broker.acknowledge(consumerExchange, ack);
+            }
+        } catch (Exception ex) {
+            TRANSPORTLOG.warn("{} could not acknowledge and send message to 
the DLQ after"
+                    + " ActiveMQMessageFormatException: {}", this, 
e.getMessage());
+        }
+    }
+
     @Override
     public boolean iterate() {
         try {
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index ed0439075c..b3fd69f61c 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -42,6 +42,7 @@ import org.apache.activemq.transport.TransportFactorySupport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.apache.activemq.util.ExceptionUtils;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
@@ -258,10 +259,10 @@ public class TransportConnector implements Connector, 
BrokerServiceAware {
 
             private void onAcceptError(Exception error, String remoteHost) {
                 if (brokerService != null && brokerService.isStopping()) {
-                    LOG.info("Could not accept connection during shutdown {} : 
{} ({})", (remoteHost == null ? "" : "from " + remoteHost), 
error.getLocalizedMessage(), getRootCause(error).getMessage());
+                    LOG.info("Could not accept connection during shutdown {} : 
{} ({})", (remoteHost == null ? "" : "from " + remoteHost), 
error.getLocalizedMessage(),  ExceptionUtils.getRootCause(error).getMessage());
                 } else {
-                    LOG.warn("Could not accept connection {}: {} ({})", 
(remoteHost == null ? "" : "from " + remoteHost), error.getMessage(), 
getRootCause(error).getMessage());
-                    LOG.debug("Reason: " + error.getMessage(), error);
+                    LOG.warn("Could not accept connection {}: {} ({})", 
(remoteHost == null ? "" : "from " + remoteHost), error.getMessage(), 
ExceptionUtils.getRootCause(error).getMessage());
+                    LOG.debug("Reason: {}", error.getMessage(), error);
                 }
             }
         });
@@ -282,20 +283,6 @@ public class TransportConnector implements Connector, 
BrokerServiceAware {
         LOG.info("Connector {} started", getName());
     }
 
-    public static Throwable getRootCause(final Throwable throwable) {
-        final List<Throwable> list = getThrowableList(throwable);
-        return list.isEmpty() ? null : list.get(list.size() - 1);
-    }
-
-    static List<Throwable> getThrowableList(Throwable throwable) {
-        final List<Throwable> list = new ArrayList<>();
-        while (throwable != null && !list.contains(throwable)) {
-            list.add(throwable);
-            throwable = throwable.getCause();
-        }
-        return list;
-    }
-
     public String getPublishableConnectString() throws Exception {
         String publishableConnectString = 
publishedAddressPolicy.getPublishableConnectString(this);
         LOG.debug("Publishing: {} for broker transport URI: {}", 
publishableConnectString, getConnectUri());
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index f33060b464..ea7582c3eb 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -38,6 +38,7 @@ import org.apache.activemq.filter.LogicExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NoLocalExpression;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.ActiveMQMessageFormatException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,13 +107,32 @@ public abstract class AbstractSubscription implements 
Subscription {
             }
         }
         try {
-            return (selectorExpression == null || 
selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
+            return matchesSelector(node, context) && 
this.context.isAllowedToConsume(node);
         } catch (JMSException e) {
             LOG.info("Selector failed to evaluate: {}", e.getMessage(), e);
             return false;
         }
     }
 
+    // This logic exists in a separate method so subscriptions can optionally 
choose to
+    // handle any exception. Normally if an exception is thrown, the matches() 
method
+    // that calls this will just log the error and return. This is correct for 
browsers as
+    // the message gets skipped. It's also correct for topic/durable subs 
because each
+    // sub independently will handle acking/removing if the message does not 
match and
+    // will not block other subs. If there is no matching durable subs the 
message gets
+    // removed from the store as well. However, for queue subscriptions, if 
there is an error
+    // we need to handle ActiveMQMessageFormatException so we don't get stuck 
in a loop
+    // because queues will keep trying to re-add the message to a sub on each 
iteration.
+    protected boolean matchesSelector(MessageReference node, 
MessageEvaluationContext context)
+            throws JMSException, ActiveMQMessageFormatException {
+        return evaluateSelectorExpression(context);
+    }
+
+    // move the original selector expression into its own method so we can 
reference it
+    protected final boolean 
evaluateSelectorExpression(MessageEvaluationContext context) throws 
JMSException {
+        return selectorExpression == null || 
selectorExpression.matches(context);
+    }
+
     @Override
     public boolean isWildcard() {
         return destinationFilter.isWildcard();
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index cd534c8a35..074e2450ee 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -98,6 +100,7 @@ import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.BrokerSupport;
+import org.apache.activemq.ActiveMQMessageFormatException;
 import org.apache.activemq.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,6 +122,8 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
     private final PendingList pagedInMessages = new OrderedPendingList();
     // Messages that are paged in but have not yet been targeted at a 
subscription
     private final ReentrantReadWriteLock pagedInPendingDispatchLock = new 
ReentrantReadWriteLock();
+    // this is guarded by pagedInPendingDispatchLock
+    private final Map<QueueMessageReference, ActiveMQMessageFormatException> 
dispatchMessageFormatErrors = new LinkedHashMap<>();
     protected QueueDispatchPendingList dispatchPendingList = new 
QueueDispatchPendingList();
     private AtomicInteger pendingSends = new AtomicInteger(0);
     private MessageGroupMap messageGroupOwners;
@@ -497,6 +502,8 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
     public void removeSubscription(ConnectionContext context, Subscription 
sub, long lastDeliveredSequenceId)
             throws Exception {
         super.removeSubscription(context, sub, lastDeliveredSequenceId);
+
+        Map<QueueMessageReference, ActiveMQMessageFormatException> 
messageFormatErrors = null;
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
         pagedInPendingDispatchLock.writeLock().lock();
@@ -593,7 +600,7 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
                 }
                 // AMQ-5107: don't resend if the broker is shutting down
                 if (dispatchPendingList.hasRedeliveries() && (! 
this.brokerService.isStopping())) {
-                    doDispatch(new OrderedPendingList());
+                    messageFormatErrors = doDispatch(new OrderedPendingList());
                 }
             } finally {
                 consumersLock.writeLock().unlock();
@@ -610,6 +617,9 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
             // https://issues.apache.org/activemq/browse/AMQ-1878
             wakeup();
         }
+
+        // Remove any corrupt messages
+        removeMessageFormatErrorMessages(messageFormatErrors);
     }
 
     private volatile ResourceAllocationException sendMemAllocationException = 
null;
@@ -1885,6 +1895,16 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         }
     }
 
+    protected void removeAndSendToDlq(ConnectionContext c, 
QueueMessageReference r, Exception e) throws IOException {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(MessageAck.POISON_ACK_TYPE);
+        ack.setPoisonCause(e);
+        ack.setDestination(destination);
+        ack.setMessageID(r.getMessageId());
+        removeMessage(c, null, r, ack);
+        broker.getRoot().sendToDeadLetterQueue(c, r.getMessage(), null, e);
+    }
+
     protected void removeMessage(ConnectionContext c, Subscription subs, 
QueueMessageReference r) throws IOException {
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
@@ -2192,9 +2212,11 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         return consumers.size() - browserSubscriptions.size() > 0;
     }
 
-    private void doDispatch(PendingList list) throws Exception {
+    private Map<QueueMessageReference, ActiveMQMessageFormatException> 
doDispatch(PendingList list) throws Exception {
         boolean doWakeUp = false;
 
+        Map<QueueMessageReference, ActiveMQMessageFormatException> 
messageFormatErrors = null;
+
         pagedInPendingDispatchLock.writeLock().lock();
         try {
             if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && 
list != null && !list.isEmpty()) {
@@ -2223,6 +2245,11 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                 }
             }
         } finally {
+            // Copy the errors and clear as we need to proces outside the lock
+            if (!dispatchMessageFormatErrors.isEmpty()) {
+                messageFormatErrors = new 
LinkedHashMap<>(dispatchMessageFormatErrors);
+                dispatchMessageFormatErrors.clear();
+            }
             pagedInPendingDispatchLock.writeLock().unlock();
         }
 
@@ -2230,6 +2257,8 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             // avoid lock order contention
             asyncWakeup();
         }
+
+        return messageFormatErrors;
     }
 
     /**
@@ -2237,6 +2266,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
      *         were not full.
      */
     private PendingList doActualDispatch(PendingList list) throws Exception {
+
         List<Subscription> consumers;
         consumersLock.readLock().lock();
 
@@ -2262,12 +2292,27 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                 }
                 if (!fullConsumers.contains(s)) {
                     if (!s.isFull()) {
-                        if (dispatchSelector.canSelect(s, node) && 
assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) 
node).isAcked() ) {
-                            // Dispatch it.
-                            s.add(node);
-                            LOG.trace("assigned {} to consumer {}", 
node.getMessageId(), s.getConsumerInfo().getConsumerId());
+                        try {
+                            if (dispatchSelector.canSelect(s, node) && 
assignMessageGroup(s,
+                                    (QueueMessageReference) node)
+                                    && !((QueueMessageReference) 
node).isAcked()) {
+                                // Dispatch it.
+                                s.add(node);
+                                LOG.trace("assigned {} to consumer {}", 
node.getMessageId(),
+                                        s.getConsumerInfo().getConsumerId());
+                                iterator.remove();
+                                target = s;
+                                break;
+                            }
+                        } catch (ActiveMQMessageFormatException e) {
+                            // A ActiveMQMessageFormatException could occur 
when evaluating
+                            // the selector which could trigger the properties 
to unmarshal or the
+                            // body to be read (for xpath selectors). This 
should be rare but
+                            // if it happens the message will just be stuck so 
we need to remove it
+                            // from the dispatched list and collect it to be 
removed from this queue
+                            // and sent to the DLQ
                             iterator.remove();
-                            target = s;
+                            
dispatchMessageFormatErrors.put((QueueMessageReference) node, e);
                             break;
                         }
                     } else {
@@ -2363,7 +2408,23 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
     }
 
     protected void pageInMessages(boolean force, int maxPageSize) throws 
Exception {
-        doDispatch(doPageInForDispatch(force, true, maxPageSize));
+        Map<QueueMessageReference, ActiveMQMessageFormatException> 
messageFormatErrors =
+            doDispatch(doPageInForDispatch(force, true, maxPageSize));
+        // Handle outside the pagedInPendingDispatchLock
+        removeMessageFormatErrorMessages(messageFormatErrors);
+    }
+
+    // Any bad messages were already removed from dispatchPendingList and not 
dispatched, so now we
+    // need to drop the message, remove it from pagedInMessages, remove from 
the store and
+    // send to the DLQ
+    private void removeMessageFormatErrorMessages(Map<QueueMessageReference, 
ActiveMQMessageFormatException> messageFormatErrors)
+            throws IOException {
+        if (messageFormatErrors != null) {
+            for (Entry<QueueMessageReference, ActiveMQMessageFormatException> 
error : messageFormatErrors.entrySet()) {
+                removeAndSendToDlq(broker.getAdminConnectionContext(), 
error.getKey(),
+                        error.getValue());
+            }
+        }
     }
 
     private void addToConsumerList(Subscription sub) {
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
index fa7c66a1bf..4efffdec77 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
@@ -31,6 +31,7 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.MarshallingSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +83,13 @@ public class QueueBrowserSubscription extends 
QueueSubscription {
         return !browseDone && super.matches(node, context);
     }
 
+    // Queue browsers can just delegate to the original method and throw a 
JMSException which
+    // wil be caught and any message can be skipped if there is an error.
+    protected boolean matchesSelector(MessageReference node, 
MessageEvaluationContext context)
+            throws JMSException {
+        return evaluateSelectorExpression(context);
+    }
+
     /**
      * Since we are a browser we don't really remove the message from the 
queue.
      */
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index c7517d5c28..2e1ec45940 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -22,12 +22,12 @@ import jakarta.jms.JMSException;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.group.MessageGroupMap;
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ExceptionUtils;
+import org.apache.activemq.ActiveMQMessageFormatException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,4 +100,27 @@ public class QueueSubscription extends 
PrefetchSubscription implements LockOwner
        return result;
     }
 
+    // For queues if a message is in a bad state it could get stuck and will 
block good
+    // messages from being processed. We don't want to handle all errors as 
not all
+    // mean the message is bad, but if we specifically know the message is 
corrupted
+    // then we should remove and DLQ as it may be stuck and not possible to 
ever dispatch.
+    @Override
+    protected boolean matchesSelector(MessageReference node, 
MessageEvaluationContext context)
+            throws JMSException, ActiveMQMessageFormatException {
+        try {
+            return super.matchesSelector(node, context);
+        } catch (JMSException e) {
+            // This may cause the headers to unmarshal which could throw 
ActiveMQUnmarshalEOFException
+            // The body could also be unmarshaled if using XPATH and could 
trigger a
+            // MessageDataFormat exception which this also handles.
+            ActiveMQMessageFormatException formatError = 
ExceptionUtils.createMessageFormatException(e);
+            if (formatError != null) {
+                LOG.error("Message could not be read for selector evaluation: 
{}", e.getMessage(), e);
+                throw formatError;
+            }
+            // it error is not an ActiveMQUnmarshalEOFException, just rethrow 
the JMSException
+            // which will be caught and handled
+            throw e;
+        }
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index bb044c797a..33c49bc8ca 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import java.util.stream.Collectors;
+import org.apache.activemq.ActiveMQMessageFormatException;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -63,6 +64,7 @@ import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.ExceptionUtils;
 import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -802,9 +804,26 @@ public class Topic extends BaseDestination implements Task 
{
 
         dispatchLock.readLock().lock();
         try {
-            if (!subscriptionRecoveryPolicy.add(context, message)) {
-                return;
+            try {
+                if (!subscriptionRecoveryPolicy.add(context, message)) {
+                    return;
+                }
+            } catch (Exception e) {
+                // In this case couldn't read the header properties so we need 
to catch and continue.
+                // We still need to let dispatchPolicy.dispatch(message, 
msgContext, consumers)
+                // run. If subs set a selector then they won't be matched if 
it can't read
+                // the properites and that code will take care of any 
removal/acks for durables
+                // by calling sub.unmatched(). If no subs match at all then 
onMessageWithNoConsumers()
+                // will be called which allows sending an adivsory if enabled 
(or if someone wanted to
+                // do something special like the DLQ).
+                ActiveMQMessageFormatException formatError = 
ExceptionUtils.createMessageFormatException(e);
+                if (formatError != null) {
+                    LOG.warn("Failed to check recovery policy, message is 
corrupt: {}", e.getMessage(), e);
+                } else {
+                    throw e;
+                }
             }
+
             synchronized (consumers) {
                 if (consumers.isEmpty()) {
                     onMessageWithNoConsumers(context, message);
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
index 6e53c76971..75491ef0fc 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/auto/nio/AutoNIOSSLTransportServer.java
@@ -24,6 +24,7 @@ import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.ExceptionUtils;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
@@ -125,7 +126,7 @@ public class AutoNIOSSLTransportServer extends 
AutoTcpTransportServer {
                 } catch (Exception error) {
                     LOG.warn("Could not accept connection {}: {} ({})",
                             (in.getRemoteAddress() == null ? "" : "from " + 
in.getRemoteAddress()), error.getMessage(),
-                            
TransportConnector.getRootCause(error).getMessage());
+                            ExceptionUtils.getRootCause(error).getMessage());
                     throw new IllegalStateException("Could not complete 
Transport start", error);
                 }
 
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageFormatException.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageFormatException.java
new file mode 100644
index 0000000000..507ff7935b
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageFormatException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+
+public class ActiveMQMessageFormatException extends IOException {
+
+    public ActiveMQMessageFormatException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index a1065dac80..140431573f 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -189,7 +189,7 @@ public class ActiveMQMapMessage extends ActiveMQMessage 
implements MapMessage {
             if (content != null) {
                 InputStream is = new ByteArrayInputStream(content);
                 if (isCompressed()) {
-                    is = new InflaterInputStream(is);
+                    is = MarshallingSupport.createInflaterInputStream(is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index 648b448195..1cd0d70e97 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -1203,6 +1203,7 @@ public class ActiveMQStreamMessage extends 
ActiveMQMessage implements StreamMess
             if (isCompressed()) {
                 is = new InflaterInputStream(is);
                 is = new BufferedInputStream(is);
+                is = 
MarshallingSupport.createFrameLimitedInputStream(Integer.MAX_VALUE, is);
             }
             this.dataIn = new DataInputStream(is);
         }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index f0a529af9d..5e73c3314b 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -95,7 +95,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage 
implements TextMessage
             try {
                 is = new ByteArrayInputStream(bodyAsBytes);
                 if (isCompressed()) {
-                    is = new InflaterInputStream(is);
+                    is = MarshallingSupport.createInflaterInputStream(is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 text = MarshallingSupport.readUTF8(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
index f96b12427d..419d31561a 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
@@ -20,6 +20,7 @@ package org.apache.activemq.transport;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Objects;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 
 /**
  * A filtered style input stream that allows reads up to a given known max 
frame size
@@ -223,7 +224,7 @@ public class FrameSizeLimitedFilterInputStream extends 
InputStream {
 
     private static void validateAvailable(int requested, int available) throws 
IOException {
         if (requested > available) {
-            throw new IOException(String.format(
+            throw new ActiveMQUnmarshalEOFException(String.format(
                 "Cannot read more than the max available %d bytes: requested 
%d", available, requested));
         }
     }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/ExceptionUtils.java 
b/activemq-client/src/main/java/org/apache/activemq/util/ExceptionUtils.java
new file mode 100644
index 0000000000..07dae20427
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ExceptionUtils.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageEOFException;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageFormatRuntimeException;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.ActiveMQMessageFormatException;
+
+public class ExceptionUtils {
+
+
+    /**
+     * Creates a new ActiveMQMessageFormatException by wrapping the existing 
throwable.
+     * This will only wrap the exception if the throwable contains a message 
format
+     * error as the root cause
+     *
+     * @param error original exception
+     * @return ActiveMQMessageFormatException if a message format error, else 
null
+     */
+    public static ActiveMQMessageFormatException 
createMessageFormatException(Throwable error) {
+        if (error instanceof ActiveMQMessageFormatException) {
+            return (ActiveMQMessageFormatException) error;
+        } else if (containsMessageFormatError(error)) {
+            return new ActiveMQMessageFormatException(error);
+        }
+        return null;
+    }
+
+    /*
+     * Check if this throwable contains a message format error.
+     * This will check the root cause and any linked exceptions as well
+     * if the exception is a JMSException
+     */
+    private static boolean containsMessageFormatError(Throwable error) {
+        if (error == null) {
+            return false;
+        }
+
+        Throwable cause = ExceptionUtils.getRootCause(error);
+        return isMessageFormatError(cause) ||
+                error instanceof JMSException && 
isMessageFormatError(((JMSException) error).getLinkedException());
+    }
+
+    /*
+     * Checks if the error is considered an error with the format of the 
message.
+     * This checks for the ActiveMQ custom ActiveMQUnmarshalEOFException that
+     * can be thrown by MarshallingSupport as well as JMS specific exceptions
+     * that indicated corruption/read problems such as MessageFormatException
+     * and MessageEOFException.
+     */
+    private static boolean isMessageFormatError(Throwable error) {
+        if (error == null) {
+            return false;
+        }
+
+        return error instanceof 
MarshallingSupport.ActiveMQUnmarshalEOFException ||
+                error instanceof MessageFormatException ||
+                error instanceof MessageEOFException;
+    }
+
+    public static Throwable getRootCause(final Throwable throwable) {
+        if (throwable == null) {
+            return null;
+        }
+        final List<Throwable> list = getThrowableList(throwable);
+        return list.isEmpty() ? null : list.get(list.size() - 1);
+    }
+
+    static List<Throwable> getThrowableList(Throwable throwable) {
+        final List<Throwable> list = new ArrayList<>();
+        while (throwable != null && !list.contains(throwable)) {
+            list.add(throwable);
+            throwable = throwable.getCause();
+        }
+        return list;
+    }
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
index bf61f307c3..c7bc76f02f 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
@@ -16,11 +16,12 @@
  */
 package org.apache.activemq.util;
 
-import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.UTFDataFormatException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -28,7 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import java.util.zip.InflaterInputStream;
+import org.apache.activemq.transport.FrameSizeLimitedFilterInputStream;
 import org.fusesource.hawtbuf.UTF8Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The fixed version of the UTF8 encoding function. Some older JVM's UTF8
@@ -36,6 +41,8 @@ import org.fusesource.hawtbuf.UTF8Buffer;
  */
 public final class MarshallingSupport {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MarshallingSupport.class);
+
     public static final byte NULL = 0;
     public static final byte BOOLEAN_TYPE = 1;
     public static final byte BYTE_TYPE = 2;
@@ -53,6 +60,15 @@ public final class MarshallingSupport {
 
     private MarshallingSupport() {}
 
+    // TODO: This will be limited in a future PR to something besides 
Integer.MAX_VALUE
+    public static InputStream createInflaterInputStream(InputStream is) {
+        return createFrameLimitedInputStream(Integer.MAX_VALUE, new 
InflaterInputStream(is));
+    }
+
+    public static InputStream createFrameLimitedInputStream(int maxAvailable, 
InputStream is) {
+        return new FrameSizeLimitedFilterInputStream(maxAvailable, is);
+    }
+
     public static void marshalPrimitiveMap(Map<String, Object> map, 
DataOutputStream out) throws IOException {
         if (map == null) {
             out.writeInt(-1);
@@ -67,7 +83,7 @@ public final class MarshallingSupport {
     }
 
     public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream 
in) throws IOException {
-        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+        return unmarshalPrimitiveMap(in, Integer.MAX_VALUE, Integer.MAX_VALUE, 
Byte.MAX_VALUE);
     }
 
     public static Map<String, Object> unmarshalPrimitiveMap(DataInputStream 
in, int maxPropertySize, int maxBufferSize, int maxDepth) throws IOException {
@@ -86,16 +102,23 @@ public final class MarshallingSupport {
         validateDepth(maxDepth, currentDepth++);
 
         int size = in.readInt();
-        if (size > maxPropertySize) {
-            throw new IOException("Primitive map is larger than the allowed 
size: " + size);
-        }
+        // validate not larger than configured max number of entries
+        validatePropertySize(size, maxPropertySize);
+
         if (size < 0) {
             return null;
         } else {
-            // Size here was already validated above
-            Map<String, Object> rc = new HashMap<>(size);
+            // Limit the pre-allocate size of the map.
+            // The number of items was validated but still exceed total size 
as we unmarshal
+            // (and max property size may not be set), so do a sanity check to 
verify
+            // the number of items is less than or equal to the remaining 
bytes.
+            validateBufferSize(in, maxBufferSize, size);
+
+            // As an extra precaution limit to no more than 128 initially
+            Map<String, Object> rc = new HashMap<>(Math.min(128, size));
             for (int i = 0; i < size; i++) {
-                String name = in.readUTF();
+                // validate key is less than max buffer size
+                String name = readUTF(in, maxBufferSize, 
in.readUnsignedShort()).toString();
                 rc.put(name, unmarshalPrimitive(in, force, maxPropertySize, 
maxBufferSize, maxDepth, currentDepth));
             }
             return rc;
@@ -119,8 +142,13 @@ public final class MarshallingSupport {
         // increment after validation, so future calls get the incremented 
depth
         validateDepth(maxDepth, currentDepth++);
 
-        int size = validateBufferSize(maxBufferSize, in.readInt());
-        List<Object> answer = new ArrayList<>(size);
+        // Limit the pre-allocate size of the list.
+        // We could still exceed total size as we unmarshal, so do a sanity 
check to verify
+        // the number of items is less than or equal to the remaining bytes
+        int size = validateBufferSize(in, maxBufferSize, in.readInt());
+
+        // As an extra precaution limit to no more than 128 initially
+        List<Object> answer = new ArrayList<>(Math.min(128, size));
         while (size-- > 0) {
             answer.add(unmarshalPrimitive(in, force, maxPropertySize, 
maxBufferSize, maxDepth, currentDepth));
         }
@@ -164,7 +192,7 @@ public final class MarshallingSupport {
     }
 
     public static Object unmarshalPrimitive(DataInputStream in) throws 
IOException {
-        return unmarshalPrimitive(in, false, Integer.MAX_VALUE, 
Integer.MAX_VALUE, Integer.MAX_VALUE, 0);
+        return unmarshalPrimitive(in, false, Integer.MAX_VALUE, 
Integer.MAX_VALUE, Byte.MAX_VALUE, 0);
     }
 
     private static Object unmarshalPrimitive(DataInputStream in, boolean 
force, int maxPropertySize, int maxBufferSize,
@@ -197,15 +225,11 @@ public final class MarshallingSupport {
             value = in.readDouble();
             break;
         case BYTE_ARRAY_TYPE:
-            value = new byte[validateBufferSize(maxBufferSize, in.readInt())];
+            value = new byte[validateBufferSize(in, maxBufferSize, 
in.readInt())];
             in.readFully((byte[])value);
             break;
         case STRING_TYPE:
-            if (force) {
-                value = in.readUTF();
-            } else {
-                value = readUTF(in, maxBufferSize, in.readUnsignedShort());
-            }
+            value = readUTF(in, maxBufferSize, in.readUnsignedShort());
             break;
         case BIG_STRING_TYPE: {
             if (force) {
@@ -231,7 +255,7 @@ public final class MarshallingSupport {
     }
 
     public static UTF8Buffer readUTF(DataInputStream in, int maxLength, int 
length) throws IOException {
-        validateBufferSize(maxLength, length);
+        validateBufferSize(in, maxLength, length);
         byte[] data = new byte[length];
         in.readFully(data);
         return new UTF8Buffer(data);
@@ -357,12 +381,12 @@ public final class MarshallingSupport {
         return offset;
     }
 
-    public static String readUTF8(DataInput dataIn) throws IOException {
+    public static String readUTF8(DataInputStream dataIn) throws IOException {
         return readUTF8(dataIn, Integer.MAX_VALUE);
     }
 
-    public static String readUTF8(DataInput dataIn, int maxBufferSize) throws 
IOException {
-        int utflen = validateBufferSize(maxBufferSize, dataIn.readInt());
+    static String readUTF8(DataInputStream dataIn, int maxBufferSize) throws 
IOException {
+        int utflen = validateBufferSize(dataIn, maxBufferSize, 
dataIn.readInt());
         if (utflen > -1) {
             byte bytearr[] = new byte[utflen];
             char chararr[] = new char[utflen];
@@ -432,16 +456,34 @@ public final class MarshallingSupport {
         return text;
     }
 
-    private static int validateBufferSize(int maxSize, int size) throws 
IOException {
+    private static void validatePropertySize(int size, int maxPropertySize) 
throws IOException {
+        if (size > maxPropertySize) {
+            throw new ActiveMQUnmarshalEOFException("Primitive map is larger 
than the allowed size: " + size);
+        }
+    }
+
+    private static int validateBufferSize(DataInputStream stream, int maxSize, 
int size) throws IOException {
+        // The size should never be more than what is greater to read
+        if (size > stream.available()) {
+            throw new ActiveMQUnmarshalEOFException("Read is greater than 
remaining available data in the stream");
+        }
+
         if (size > maxSize) {
-            throw new IOException("Max buffer size: " + maxSize + " exceeded, 
size: " + size);
+            throw new ActiveMQUnmarshalEOFException("Max buffer size: " + 
maxSize + " exceeded, size: " + size);
         }
         return size;
     }
 
-    private static void validateDepth(int maxDepth, int currentDepth) throws 
IOException {
+    private static void validateDepth(int maxDepth, int currentDepth) throws 
EOFException {
         if (currentDepth > maxDepth) {
-            throw new IOException("Max unmarshaling depth: " + maxDepth + " 
exceeded, depth: " + currentDepth);
+            throw new ActiveMQUnmarshalEOFException("Max unmarshaling depth: " 
+ maxDepth + " exceeded, depth: " + currentDepth);
         }
     }
+
+    public static class ActiveMQUnmarshalEOFException extends EOFException {
+        public ActiveMQUnmarshalEOFException(String message) {
+            super(message);
+        }
+    }
+
 }
diff --git 
a/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
 
b/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
index 59a5e602d6..d9e778c8e5 100644
--- 
a/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
+++ 
b/activemq-client/src/test/java/org/apache/activemq/util/MarshallingSupportTest.java
@@ -79,8 +79,8 @@ public class MarshallingSupportTest {
 
         // buffers too large
         dataIn.reset();
-        assertException(() -> unmarshalPrimitiveMap(dataIn, 100, 2, 10),
-                "Max buffer size: 2 exceeded, size: 6");
+        assertException(() -> unmarshalPrimitiveMap(dataIn, 100, 4, 10),
+                "Max buffer size: 4 exceeded, size: 6");
 
         // max depth violated
         dataIn.reset();
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 677bec3944..6f0176a558 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -62,8 +62,10 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import 
org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.NioSslTestUtil;
 import org.apache.activemq.util.Wait;
@@ -1130,6 +1132,68 @@ public class MQTTTest extends MQTTTestSupport {
         activeMQConnection.close();
     }
 
+    // The following test will corrupt a message and test the MQTT
+    // protocol correctly passes the error during
+    // dispatch to allow the Transport Connection to properly handle
+    // with a poison ack so the message will be removed from the subscription.
+    @Test
+    public void testCorruptHeaders() throws Exception {
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+
+        String destinationName = "foo.far";
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
cf.createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        jakarta.jms.Topic jmsTopic = s.createTopic(destinationName);
+        MessageProducer producer = s.createProducer(jmsTopic);
+
+        provider.subscribe("foo/+", AT_MOST_ONCE);
+        ActiveMQMessage sendMessage = (ActiveMQMessage) 
s.createTextMessage("test");
+        sendMessage.setStringProperty("test", "Test");
+        // marshal and corrupt props
+        sendMessage.beforeMarshall(null);
+        ByteSequenceData.writeIntBig(sendMessage.getMarshalledProperties(), 
1000);
+        producer.send(sendMessage);
+        assertNull("Should not get message", provider.receive(500));
+
+        provider.disconnect();
+        activeMQConnection.close();
+
+        // verify message is gone off the dest
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(new 
ActiveMQTopic(destinationName))
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+    }
+
+    @Test
+    public void testCorruptBody() throws Exception {
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+
+        String destinationName = "foo.far";
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
cf.createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        jakarta.jms.Topic jmsTopic = s.createTopic(destinationName);
+        MessageProducer producer = s.createProducer(jmsTopic);
+
+        provider.subscribe("foo/+", AT_MOST_ONCE);
+        ActiveMQMessage sendMessage = (ActiveMQMessage) 
s.createTextMessage("test");
+
+        // marshal and corrupt body
+        sendMessage.storeContentAndClear();
+        ByteSequenceData.writeIntBig(sendMessage.getContent(), 1000);
+        producer.send(sendMessage);
+        assertNull("Should not get message", provider.receive(500));
+
+        provider.disconnect();
+        activeMQConnection.close();
+
+        // verify message is gone off the dest
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(new 
ActiveMQTopic(destinationName))
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+    }
+
     @Test(timeout = 60 * 1000)
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
         MQTT mqtt = createMQTTConnection();
diff --git 
a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
 
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 2fa9651194..92f034cf7a 100644
--- 
a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ 
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -74,6 +74,23 @@ public class StompSubscription {
     void onMessageDispatch(MessageDispatch md) throws IOException, 
JMSException {
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
 
+        boolean ignoreTransformation = false;
+
+        if (transformation != null && !( message instanceof 
ActiveMQBytesMessage ) ) {
+            message.setReadOnlyProperties(false);
+            message.setStringProperty(Stomp.Headers.TRANSFORMATION, 
transformation);
+        } else {
+            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != 
null) {
+                ignoreTransformation = true;
+            }
+        }
+
+        // This has been intentionally moved to happen before the acks are set 
up and
+        // auto ack is done, which is line with all the other protocols.
+        StompFrame command = protocolConverter.convertMessage(message, 
ignoreTransformation);
+
+        // Only configure the acks after protocol conversion. If there is an 
error we don't want to
+        // track pending acks or auto ack as the message won't be dispatched
         String ackId = null;
         if (isClientAck() || isIndividualAck()) {
             ackId = ACK_ID_GENERATOR.generateId();
@@ -90,19 +107,6 @@ public class StompSubscription {
             protocolConverter.getStompTransport().sendToActiveMQ(ack);
         }
 
-        boolean ignoreTransformation = false;
-
-        if (transformation != null && !( message instanceof 
ActiveMQBytesMessage ) ) {
-            message.setReadOnlyProperties(false);
-            message.setStringProperty(Stomp.Headers.TRANSFORMATION, 
transformation);
-        } else {
-            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != 
null) {
-                ignoreTransformation = true;
-            }
-        }
-
-        StompFrame command = protocolConverter.convertMessage(message, 
ignoreTransformation);
-
         command.setAction(Stomp.Responses.MESSAGE);
         if (subscriptionId != null) {
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, 
subscriptionId);
diff --git 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index e843e8d942..783e439d00 100644
--- 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -53,11 +54,18 @@ import jakarta.jms.TextMessage;
 import javax.management.ObjectName;
 
 import javax.net.ssl.SSLSocket;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.AbstractSubscription;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -68,6 +76,7 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.transport.stomp.Stomp.Commands;
 import org.apache.activemq.transport.stomp.Stomp.Responses;
+import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.NioSslTestUtil;
 import org.apache.activemq.util.Wait;
 import org.junit.Assume;
@@ -121,6 +130,9 @@ public class StompTest extends StompTestSupport {
         + "]"
         + "}}";
 
+
+    final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+
     @Override
     public void setUp() throws Exception {
         queue = new ActiveMQQueue(getQueueName());
@@ -135,6 +147,7 @@ public class StompTest extends StompTestSupport {
         xstream = new XStream();
         xstream.processAnnotations(SamplePojo.class);
         xstream.allowTypes(new Class[] { SamplePojo.class });
+        sentToDlq.set(false);
     }
 
     @Override
@@ -157,6 +170,26 @@ public class StompTest extends StompTestSupport {
         }
     }
 
+    @Override
+    protected void addAdditionalPlugins(List<BrokerPlugin> plugins) throws 
Exception {
+        super.addAdditionalPlugins(plugins);
+        plugins.add(new BrokerPluginSupport() {
+            @Override
+            public Broker installPlugin(Broker broker) {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public boolean sendToDeadLetterQueue(ConnectionContext 
context,
+                            MessageReference messageReference, Subscription 
subscription,
+                            Throwable poisonCause) {
+                        sentToDlq.set(true);
+                        return super.sendToDeadLetterQueue(context, 
messageReference,
+                                subscription, poisonCause);
+                    }
+                };
+            }
+        });
+    }
+
     public void sendMessage(String msg) throws Exception {
         sendMessage(msg, "foo", "xyz");
     }
@@ -290,6 +323,40 @@ public class StompTest extends StompTestSupport {
         assertEquals(body, message.getBody());
     }
 
+    // The following test will corrupt a message and test the Stomp
+    // protocol correctly passes the error during
+    // dispatch to allow the Transport Connection to properly handle
+    // with a poison ack so the message will be removed from the subscription.
+    @Test(timeout = 60000)
+    public void testCorruptMessage() throws Exception {
+        MessageProducer producer = session.createProducer(queue);
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" 
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" 
+ "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        ActiveMQTextMessage msg = (ActiveMQTextMessage) 
session.createTextMessage("test");
+
+        // corrupt the buffer
+        msg.storeContentAndClear();
+        ByteSequenceData.writeIntBig(msg.getContent(), 1000);
+        producer.send(msg);
+
+        // Message should not be received because the UTF8 buffer is corrupt
+        try {
+            StompFrame frameNull = stompConnection.receive(500);
+            if (frameNull != null) {
+                fail("Should not have received any messages");
+            }
+        } catch (SocketTimeoutException ignored) {}
+
+        // Message should go to the DLQ
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+        assertTrue(sentToDlq.get());
+    }
+
     @Test(timeout = 60000)
     public void testJMSXGroupIdCanBeSet() throws Exception {
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
new file mode 100644
index 0000000000..d68701e1be
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQMessageFormatExceptionSelectorTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Enumeration;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.test.annotations.ParallelTest;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+
+@Category(ParallelTest.class)
+public class ActiveMQMessageFormatExceptionSelectorTest {
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new 
File("target"));
+
+    private URI clientUri;
+    private BrokerService brokerService;
+    private final AtomicInteger dlqCount = new AtomicInteger();
+
+    @Before
+    public void setUp() throws Exception {
+        dlqCount.set(0);
+        startBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        stopBroker();
+    }
+
+    // Test that queue browsers just skip corrupt messages as they are only 
going to be
+    // DLQ'd and removed for normal queue consumers
+    @Test(timeout = 30000)
+    public void testUnmarshalQueueBrowseSubscription() throws Exception {
+        try (ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory(
+                clientUri).createConnection()) {
+            connection.setClientID("client");
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue dest = session.createQueue("test.queue");
+            MessageProducer producer = session.createProducer(dest);
+            QueueBrowser browser =  session.createBrowser(dest, 
"stringProperty = 'a'");
+
+            // Mix good and bad messages to ensure the good ones are still 
received
+            for (int i = 0; i < 100; i++) {
+                ActiveMQTextMessage message = (ActiveMQTextMessage) 
session.createTextMessage("test-message");
+                message.setIntProperty("count", i);
+                message.setStringProperty("stringProperty", "a");
+
+                if (i % 5 == 0) {
+                    message.beforeMarshall(null);
+                    
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+                }
+
+                producer.send(message);
+            }
+
+            Enumeration enumeration = browser.getEnumeration();
+            int i = 0;
+            while (enumeration.hasMoreElements()) {
+                // skip expected bad message
+                if (i % 5 == 0) {
+                    i++;
+                }
+                jakarta.jms.Message message = (Message) 
enumeration.nextElement();
+                assertNotNull(message);
+                assertEquals(i, message.getIntProperty("count"));
+                i++;
+            }
+
+            Destination destination = 
brokerService.getDestination((ActiveMQDestination) dest);
+            // browsers should NOT remove/move messages
+            assertEquals(100, 
destination.getDestinationStatistics().getMessages().getCount());
+            assertTrue(destination.getMemoryUsage().getUsage() > 0);
+            assertEquals(0, dlqCount.get());
+            assertEquals(100, destination.getMessageStore().getMessageCount());
+        }
+    }
+
+    @Test(timeout = 30000)
+    public void testUnmarshalQueueSubscription() throws Exception {
+        testUnmarshalFail("test.queue", true, false);
+        // For queues, the message is already accepted onto the queue so
+        // if there is an error when trying to match for a consumer, and it is
+        // corrupted it makes sense to just remove with the first error and
+        // send to the DLQ
+        assertEquals(20, dlqCount.get());
+    }
+
+    @Test(timeout = 30000)
+    public void testUnmarshalTopicSubscription() throws Exception {
+        testUnmarshalFail("test.topic", false, false);
+        // for topic subscription on error it just won't match the selector
+        // and skips adding to the sub, but other subs may be able to receive
+        // as it evaluates each sub independently (not all subs may use a 
selector)
+        // Messages won't be stuck and block other subscriptions so no need to
+        // do anything special and there is no DLQ because it was never even 
added
+        // to the subscription
+    }
+
+    @Test(timeout = 30000)
+    public void testUnmarshalDurableSubscription() throws Exception {
+        testUnmarshalFail("test.topic", false, true);
+        // for durable subscription on error it just won't match the selector
+        // and skips adding to the sub, but other subs may be able to receive
+        // as it evaluates each sub independently (not all subs may use a 
selector)
+        // Messages won't be stuck and block other subscriptions so no need to
+        // do anything special and there is no DLQ because it was never even 
added
+        // to the subscription
+    }
+
+    // Test mixing subs with and without selectors
+    @Test(timeout = 30000)
+    public void testMultipleTopicSubs() throws Exception {
+        testMultipleSubs(false);
+    }
+
+    @Test(timeout = 30000)
+    public void testMultipleDurableSubs() throws Exception {
+        testMultipleSubs(true);
+    }
+
+    private void testMultipleSubs(boolean durable) throws Exception {
+        try (ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory(
+                clientUri).createConnection()) {
+            connection.setClientID("client");
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            ActiveMQTopic dest =  (ActiveMQTopic) session.createTopic("topic");
+            MessageProducer producer = session.createProducer(dest);
+            MessageConsumer selectorConsumer = durable ?
+                    session.createDurableSubscriber((Topic) dest, "sub", 
"stringProperty = 'a'", false) :
+                    session.createConsumer(dest, "stringProperty = 'a'");
+            MessageConsumer consumer = durable ?
+                    session.createDurableSubscriber((Topic) dest, "sub2") : 
session.createConsumer(dest);
+
+            // Mix good and bad messages to ensure the good ones are still 
received
+            for (int i = 0; i < 10; i++) {
+                ActiveMQTextMessage message = (ActiveMQTextMessage) 
session.createTextMessage("test-message");
+                message.setIntProperty("count", i);
+                message.setStringProperty("stringProperty", "a");
+                message.beforeMarshall(null);
+                // corrupt
+                
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+                producer.send(message);
+            }
+
+            // no selector should get all 10
+            for (int i = 0; i < 10; i++) {
+                jakarta.jms.Message message = consumer.receive(100);
+                assertNotNull(message);
+            }
+
+            // selector so should error and not get any
+            assertNull(selectorConsumer.receive(100));
+
+            // messages should be gone
+            Destination destination = brokerService.getDestination(dest);
+            assertTrue(Wait.waitFor(() -> 
destination.getDestinationStatistics().getMessages().getCount() == 0,
+                    500, 10));
+            assertTrue(Wait.waitFor(
+                    () -> destination.getMemoryUsage().getUsage() == 0, 1000, 
100));
+        }
+    }
+
+    private void testUnmarshalFail(String destName, boolean queue, boolean 
durable) throws Exception {
+        try (ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory(
+                clientUri).createConnection()) {
+            connection.setClientID("client");
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            ActiveMQDestination dest = queue ? (ActiveMQDestination) 
session.createQueue(destName) :
+                    (ActiveMQDestination) session.createTopic(destName);
+            MessageProducer producer = session.createProducer(dest);
+            MessageConsumer consumer = durable ?
+                    session.createDurableSubscriber((Topic) dest, "sub", 
"stringProperty = 'a'", false) :
+                    session.createConsumer(dest, "stringProperty = 'a'");
+
+            // Mix good and bad messages to ensure the good ones are still 
received
+            for (int i = 0; i < 100; i++) {
+                ActiveMQTextMessage message = (ActiveMQTextMessage) 
session.createTextMessage("test-message");
+                message.setIntProperty("count", i);
+                message.setStringProperty("stringProperty", "a");
+
+                if (i % 5 == 0) {
+                    message.beforeMarshall(null);
+                    
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+                }
+                producer.send(message);
+            }
+
+            for (int i = 0; i < 100; i++) {
+                // skip expected bad message
+                if (i % 5 == 0) {
+                    i++;
+                }
+                jakarta.jms.Message message = consumer.receive(100);
+                assertNotNull(message);
+                assertEquals(i, message.getIntProperty("count"));
+            }
+
+            Destination destination = brokerService.getDestination(dest);
+            assertTrue(Wait.waitFor(() -> 
destination.getDestinationStatistics().getMessages().getCount() == 0,
+                    500, 10));
+            assertTrue(Wait.waitFor(
+                    () -> destination.getMemoryUsage().getUsage() == 0, 1000, 
100));
+            assertEquals(0, destination.getMessageStore().getMessageCount());
+        }
+    }
+
+    // test that the messages that are corrupt are still handled by the queue
+    // on restart and load when consumers come online. This is really only a 
big concern for queues.
+    // For topic/durables subs the messages won't be added as they won't match 
in the first place
+    // if corrupt so they should not be there on restart. If they are for some 
reason
+    // (maybe the broker crashed during sub matching and didn't ack) then an 
exception is logged and
+    // they get skipped and not loaded (this is the exact same behavior as 
previously). Topic sub
+    // behavior on restart could look to be improved in a future change but is 
an edge case
+    // and logging the error allows an admin to address if it does happen for 
topics
+    @Test
+    public void testRestart() throws Exception {
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(clientUri);
+        ActiveMQConnection connection = (ActiveMQConnection) 
factory.createConnection();
+        connection.setClientID("client");
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination dest = (ActiveMQDestination) 
session.createQueue("Test'") ;
+        MessageProducer producer = session.createProducer(dest);
+
+        // send 10 good and 10 bad
+        for (int i = 0; i < 20; i++) {
+            ActiveMQTextMessage message = (ActiveMQTextMessage) 
session.createTextMessage("test-message");
+            message.setStringProperty("stringProperty", "a");
+            if (i % 2 == 0) {
+                message.beforeMarshall(null);
+                
ByteSequenceData.writeIntBig(message.getMarshalledProperties(), 1024 * 1024);
+            }
+            producer.send(message);
+        }
+        connection.close();
+        // All 20 messages should be persisted as no consumers yet
+        Destination destination = brokerService.getDestination(dest);
+        assertEquals(20, destination.getMessageStore().getMessageCount());
+        // stop and restart broker to verify it loads all the messages
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        startBroker();
+
+        // bring the consumer online which should trigger the errors and DLQ
+        factory = new ActiveMQConnectionFactory(clientUri);
+        connection = (ActiveMQConnection) factory.createConnection();
+        connection.setClientID("client");
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(dest, 
"stringProperty = 'a'");
+
+        // we should get 10
+        for (int i = 0; i < 10; i++) {
+            jakarta.jms.Message message = consumer.receive(100);
+            assertNotNull(message);
+        }
+        // should only get 10
+        assertNull(consumer.receive(100));
+        Destination regionDest = brokerService.getDestination(dest);
+        // makes sure messages are gone
+        assertTrue(Wait.waitFor(() -> 
regionDest.getDestinationStatistics().getMessages().getCount() == 0,
+                500, 10));
+        assertTrue(Wait.waitFor(
+                () -> regionDest.getMemoryUsage().getUsage() == 0, 1000, 100));
+        assertEquals(0, regionDest.getMessageStore().getMessageCount());
+    }
+
+    // Xpath selector will trigger the body to unmarshal, make sure that is 
handled as well
+    // The queue should detect that error when trying to add to the consumer 
and DLQ
+    @Test
+    public void testXpath() throws Exception {
+        try (ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory(
+                clientUri).createConnection()) {
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            ActiveMQDestination dest = (ActiveMQDestination) 
session.createQueue("test.queue");
+            MessageProducer producer = session.createProducer(dest);
+            MessageConsumer consumer = session.createConsumer(dest, "XPATH 
'//books//book[@lang=''en'']'");
+
+            ActiveMQTextMessage message = (ActiveMQTextMessage) 
session.createTextMessage(
+                    "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book 
lang=\"en\">ABC</book></books>");
+            message.storeContentAndClear();
+            ByteSequenceData.writeIntBig(message.getContent(), 1024 * 1024);
+            producer.send(message);
+
+            assertNull(consumer.receive(100));
+
+            Destination destination = brokerService.getDestination(dest);
+            assertTrue(Wait.waitFor(() -> 
destination.getDestinationStatistics().getMessages().getCount() == 0,
+                    500, 10));
+            assertTrue(Wait.waitFor(
+                    () -> destination.getMemoryUsage().getUsage() == 0, 1000, 
100));
+            assertEquals(0, destination.getMessageStore().getMessageCount());
+        }
+    }
+
+
+    protected void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setSendAdvisoryIfNoConsumers(true);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.setPersistent(true);
+        brokerService.setDataDirectoryFile(dataFileDir.getRoot());
+        KahaDBStore store = new KahaDBStore();
+        store.setDirectory(dataFileDir.getRoot());
+        
store.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.NEVER.name());
+        brokerService.setPersistenceAdapter(store);
+        brokerService.setUseJmx(false);
+        brokerService.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
+            @Override
+            public Broker installPlugin(Broker broker) {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public boolean sendToDeadLetterQueue(ConnectionContext 
context,
+                            MessageReference messageReference, Subscription 
subscription,
+                            Throwable poisonCause) {
+                        dlqCount.getAndIncrement();
+                        return super.sendToDeadLetterQueue(context, 
messageReference,
+                                subscription, poisonCause);
+                    }
+                };
+            }
+        }});
+
+        TransportConnector connector = 
brokerService.addConnector("nio://localhost:0");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        clientUri = connector.getPublishableConnectURI();
+    }
+
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
index fb2a13f8df..fef0d358dc 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
@@ -34,6 +34,7 @@ import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.transport.RequestTimedOutIOException;
+import org.apache.activemq.util.ExceptionUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -170,7 +171,7 @@ public class SyncSendPacketTimeoutTest {
             }
             assertNotNull("Should have caught a JMSException", exception);
             assertEquals(RequestTimedOutIOException.class,
-                    TransportConnector.getRootCause(exception).getClass());
+                    ExceptionUtils.getRootCause(exception).getClass());
         }
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
index 82861d4af1..c09c5e03ef 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -33,7 +35,11 @@ import jakarta.jms.MessageFormatException;
 import jakarta.jms.MessageNotReadableException;
 import jakarta.jms.MessageNotWriteableException;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.test.annotations.ParallelTest;
+import org.apache.activemq.util.ByteSequenceData;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -471,4 +477,31 @@ public class ActiveMQMapMessageTest {
         msg.getShort("short");
         msg.getString("string");
     }
+
+    @Test
+    public void testUnmarshalException() throws Exception {
+        ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+        ActiveMQMapMessage msg = new ActiveMQMapMessage();
+        msg.setConnection(connection);
+        msg.setString("test", "test");
+
+        // store and marshal
+        msg.storeContentAndClear();
+        assertTrue(msg.map.isEmpty());
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.content, 1000);
+
+        try {
+            // trigger unmarshalling the map
+            msg.getString("test");
+            fail("Should have thrown exception");
+        } catch (JMSException e) {
+            // expected
+            assertTrue(
+                    ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
+        }
+    }
+
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
index 87f1a54228..1967f7b139 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
@@ -32,6 +32,8 @@ import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.test.annotations.ParallelTest;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 import org.apache.activemq.wireformat.WireFormat;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -993,4 +995,25 @@ public class ActiveMQMessageTest extends TestCase {
         msg.setJMSExpiration(System.currentTimeMillis() + 10000);
         assertFalse(msg.isExpired());
     }
+
+    public void testUnmarshalPropertiesException() throws Exception {
+        ActiveMQMessage msg = new ActiveMQMessage();
+        msg.setProperty("test", "test");
+
+        // marshal properties and clear unmarshaled state
+        msg.beforeMarshall(null);
+        msg.clearUnMarshalledState();
+        assertNull(msg.properties);
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.marshalledProperties, 100000);
+
+        try {
+            // this will trigger unmarshalling
+            msg.getProperty("test");
+            fail("Should have thrown exception");
+        } catch (ActiveMQUnmarshalEOFException e) {
+            // expected
+        }
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
index fb8d46e1f3..c03fa938c2 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.command;
 
+
+import static org.mockito.Mockito.mock;
+
 import java.io.IOException;
 
 import jakarta.jms.JMSException;
@@ -23,7 +26,10 @@ import jakarta.jms.MessageNotReadableException;
 import jakarta.jms.MessageNotWriteableException;
 
 import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.test.annotations.ParallelTest;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.junit.experimental.categories.Category;
 
 /**
@@ -126,4 +132,28 @@ public class ActiveMQObjectMessageTest extends TestCase {
         }
     }
 
+    public void testUnCompressedException() throws Exception {
+        ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+        ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
+        msg.setConnection(connection);
+        msg.setObject("test");
+
+        // store and marshal
+        msg.storeContentAndClear();
+        assertNull(msg.object);
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.content, 1000);
+
+        try {
+            // trigger unmarshalling the object
+            msg.getObject();
+            fail("Should have thrown exception");
+        } catch (JMSException e) {
+            // uncompressed will have an error from the JDK deserialization
+            assertTrue(ExceptionUtils.getRootCause(e) instanceof IOException);
+        }
+    }
+
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
index e75a9d1ade..edac458fe0 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
@@ -18,8 +18,10 @@ package org.apache.activemq.command;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import jakarta.jms.JMSException;
 import jakarta.jms.MessageEOFException;
@@ -27,7 +29,11 @@ import jakarta.jms.MessageFormatException;
 import jakarta.jms.MessageNotReadableException;
 import jakarta.jms.MessageNotWriteableException;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.test.annotations.ParallelTest;
+import org.apache.activemq.util.ByteSequenceData;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -1075,4 +1081,28 @@ public class ActiveMQStreamMessageTest {
             message.readBoolean();
         } catch (MessageEOFException ex) {}
     }
+
+    @Test
+    public void testUnmarshalException() throws Exception {
+        ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+        ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
+        msg.setConnection(connection);
+        msg.writeBytes("Test".getBytes());
+
+        // store and marshal
+        msg.reset();
+        assertNull(msg.dataOut);
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.content, 1000000);
+
+        try {
+            msg.readBytes(new byte[1024]);
+            fail("Should have thrown exception");
+        } catch (JMSException e) {
+            // expected
+            assertTrue(e instanceof MessageFormatException);
+        }
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
index c331f8a26b..2ea8695d1a 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.command;
 
+import static org.mockito.Mockito.mock;
+
 import java.beans.Transient;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -28,10 +30,14 @@ import jakarta.jms.MessageNotWriteableException;
 import junit.framework.TestCase;
 import junit.textui.TestRunner;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.test.annotations.ParallelTest;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.MarshallingSupport;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.junit.experimental.categories.Category;
 
 /**
@@ -158,6 +164,30 @@ public class ActiveMQTextMessageTest extends TestCase {
         Method method = 
ActiveMQTextMessage.class.getMethod("getRegionDestination");
         assertTrue(method.isAnnotationPresent(Transient.class));
     }
+
+    public void testUnUnmarshalException() throws Exception {
+        ActiveMQConnection connection = mock(ActiveMQConnection.class);
+
+        ActiveMQTextMessage msg = new ActiveMQTextMessage();
+        msg.setConnection(connection);
+        msg.setText("content");
+
+        // store and marshal
+        msg.storeContentAndClear();
+        assertNull(msg.text);
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.content, 1000);
+
+        try {
+            msg.getText();
+            fail("Should have thrown exception");
+        } catch (JMSException e) {
+            // expected
+            assertTrue(
+                    ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
+        }
+    }
     
     protected void setContent(Message message, String text) throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to