This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-6.2.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.2.x by this push:
     new afd714e6ad Add support for maxInflateDataSize to limit uncompressed 
buffers (#2139) (#2147)
afd714e6ad is described below

commit afd714e6ad90208e5838fef01b3bb586db35faa4
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue Jun 23 13:43:21 2026 -0400

    Add support for maxInflateDataSize to limit uncompressed buffers (#2139) 
(#2147)
    
    This change adds support for limiting the maximum inflation size of a
    message body when a message needs to be decompressed. This new setting
    will help prevent OOM errors from large buffers being allocated.
    
    The primary concern this is addressing is that compressed messages may
    be smaller than maxFrameSize and the broker will accept them, but if an
    event triggers a decompression a huge buffer could be created and cause
    OOM.
    
    The broker will have a new maxInflateDataSize config that is broker wide
    because te value isn't tied to a protocol and it will default to 100 MB.
    Clients are tied to a transport, so it makes more sense to make it as a
    ratio of maxFrameSize. The default is 10x maxFrameSize if configured
    which should be enough of a buffer under normal circumstances. The ratio
    can be changed using maxInflatedDataSizeRatio on a connection.
    
    (cherry picked from commit 94a40565f146fa3e134085637f4f676f9cc6e16b)
---
 .../transport/amqp/message/AmqpMessageSupport.java |  16 ++-
 .../transport/amqp/JMSInteroperabilityTest.java    |  36 ++++++
 .../message/JMSMappingOutboundTransformerTest.java |  26 ++++-
 .../org/apache/activemq/broker/BrokerService.java  |  14 +++
 .../activemq/broker/region/BaseDestination.java    |   5 +
 .../activemq/broker/region/DestinationFilter.java  |   5 +
 .../org/apache/activemq/ActiveMQConnection.java    |  24 ++++
 .../apache/activemq/ActiveMQConnectionFactory.java |  21 ++++
 .../activemq/command/ActiveMQBytesMessage.java     |   9 +-
 .../activemq/command/ActiveMQMapMessage.java       |   3 +-
 .../activemq/command/ActiveMQObjectMessage.java    |   4 +-
 .../activemq/command/ActiveMQStreamMessage.java    |  12 +-
 .../activemq/command/ActiveMQTextMessage.java      |   3 +-
 .../java/org/apache/activemq/command/Message.java  |  15 ++-
 .../apache/activemq/openwire/OpenWireFormat.java   |   1 +
 .../apache/activemq/util/MarshallingSupport.java   |  24 +++-
 .../transport/mqtt/MQTTProtocolConverter.java      |   5 +
 .../apache/activemq/transport/mqtt/MQTTTest.java   |  47 ++++++++
 .../apache/activemq/transport/stomp/StompTest.java |  52 +++++++++
 .../activemq/command/ActiveMQBytesMessageTest.java |  31 ++++++
 .../activemq/command/ActiveMQMapMessageTest.java   |  13 ++-
 .../command/ActiveMQObjectMessageTest.java         |  20 +++-
 .../command/ActiveMQStreamMessageTest.java         |  20 +++-
 .../activemq/command/ActiveMQTextMessageTest.java  |  15 ++-
 .../activemq/command/MessageCompressionTest.java   | 123 +++++++++++++++++++--
 25 files changed, 516 insertions(+), 28 deletions(-)

diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
index eedb464f72..102a1701a5 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp.message;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
@@ -38,6 +39,7 @@ import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -213,6 +215,12 @@ public final class AmqpMessageSupport {
 
             if (message.isCompressed()) {
                 int length = (int) message.getBodyLength();
+                // before we allocate the buffer ensure it's not too large
+                try {
+                    
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(),
 length);
+                } catch (IOException cause) {
+                    throw JMSExceptionSupport.create(cause);
+                }
                 byte[] uncompressed = new byte[length];
                 message.readBytes(uncompressed);
 
@@ -244,7 +252,9 @@ public final class AmqpMessageSupport {
             if (message.isCompressed()) {
                 try (ByteArrayOutputStream os = new ByteArrayOutputStream();
                      ByteArrayInputStream is = new 
ByteArrayInputStream(contents);
-                     InflaterInputStream iis = new InflaterInputStream(is);) {
+                     // wrap to prevent allocating more than 
maxInflatedDataSize
+                     InputStream iis = 
MarshallingSupport.createInflaterInputStream(
+                            message.getMaxInflatedDataSize(), is)) {
 
                     byte value;
                     while ((value = (byte) iis.read()) != -1) {
@@ -282,10 +292,14 @@ public final class AmqpMessageSupport {
 
             if (message.isCompressed()) {
                 try (ByteArrayInputStream is = new 
ByteArrayInputStream(contents);
+                     // We do not need to wrap this stream, the size is 
validated below
+                     // before allocation
                      InflaterInputStream iis = new InflaterInputStream(is);
                      DataInputStream dis = new DataInputStream(iis);) {
 
                     int size = dis.readInt();
+                    // before we allocate the buffer ensure it's not too large
+                    
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(),
 size);
                     byte[] uncompressed = new byte[size];
                     dis.readFully(uncompressed);
 
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 415632cab7..53e19ea1af 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -521,6 +521,42 @@ public class JMSInteroperabilityTest extends 
JMSClientTestSupport {
         openwire.close();
     }
 
+    @Test
+    public void testOpenWireToQpidCompressionFailure() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        // set to 512 bytes
+        brokerService.setMaxInflatedDataSize(512);
+
+        try (Connection openwire = createJMSConnection(); Connection amqp = 
createConnection()) {
+            ((ActiveMQConnection) openwire).setUseCompression(true);
+            openwire.start();
+            amqp.start();
+
+            Session openwireSession = openwire.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Session amqpSession = amqp.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            Destination queue = 
openwireSession.createQueue(getDestinationName());
+            MessageProducer openwireProducer = 
openwireSession.createProducer(queue);
+            MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+            StringBuilder builder = new StringBuilder();
+            // generate a string longer than 512 bytes
+            for (int i = 1; i <= 50; i++) {
+                builder.append("compresedpayload");
+            }
+            // Create and send the Message
+            
openwireProducer.send(openwireSession.createTextMessage(builder.toString()));
+
+            // There should be an error triggered on dispatch during 
decompression
+            // and message should go to the DLQ
+            assertNull(amqpConsumer.receive(1000));
+            assertTrue(sentToDlq.get());
+        }
+    }
+
     // The following tests for corruption will corrupt the headers or body
     // to test that the AMQP protocol correctly passes the error during
     // dispatch to allow the Transport Connection to properly handle
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index ea43554482..c9da7149c9 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -754,8 +755,23 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedTextMessageCreatesDataSectionBody() 
throws Exception {
+        ActiveMQTextMessage outbound = 
createTextMessage("myTextMessageContent", true);
+        ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+        Mockito.when(connection.isUseCompression()).thenReturn(true);
+        // override to trigger an error with decompression
+        Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(10);
+        outbound.setConnection(connection);
+        assertThrows(JMSException.class, () -> 
testConvertCompressedTextMessageCreatesDataSectionBody(outbound));
+    }
+
+    @Test
+    public void 
testConvertCompressedTextMessageCreatesDataSectionBodyMaxInflatedSize() throws 
Exception {
+        ActiveMQTextMessage outbound = 
createTextMessage("myTextMessageContent", true);
+        testConvertCompressedTextMessageCreatesDataSectionBody(outbound);
+    }
+
+    private void 
testConvertCompressedTextMessageCreatesDataSectionBody(ActiveMQTextMessage 
outbound) throws Exception {
         String contentString = "myTextMessageContent";
-        ActiveMQTextMessage outbound = createTextMessage(contentString, true);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         outbound.onSend();
         outbound.storeContent();
@@ -775,7 +791,8 @@ public class JMSMappingOutboundTransformerTest {
         String contents = new String(data.getArray(), data.getArrayOffset(), 
data.getLength(), StandardCharsets.UTF_8);
         assertEquals(contentString, contents);
     }
-    
+
+
     @Test 
     public void testConvertConnectionInfo() throws Exception {
        String connectionId = "myConnectionId";
@@ -1014,6 +1031,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             message.setConnection(connection);
         }
 
@@ -1030,6 +1048,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             message.setConnection(connection);
         }
 
@@ -1046,6 +1065,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             message.setConnection(connection);
         }
 
@@ -1066,6 +1086,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             result.setConnection(connection);
         }
 
@@ -1100,6 +1121,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             result.setConnection(connection);
         }
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 4cb353ba3c..06958618f0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -269,6 +269,7 @@ public class BrokerService implements Service {
     private final List<Runnable> preShutdownHooks = new 
CopyOnWriteArrayList<>();
 
     private int maxUncommittedCount = DEFAULT_MAX_UNCOMMITTED_COUNT;
+    private int maxInflatedDataSize = 
OpenWireFormat.DEFAULT_MAX_INFLATED_DATA_SIZE;
 
     static {
 
@@ -3349,4 +3350,17 @@ public class BrokerService implements Service {
         this.maxUncommittedCount = maxUncommittedCount;
     }
 
+    public int getMaxInflatedDataSize() {
+        return maxInflatedDataSize;
+    }
+
+    /**
+     * Set the maximum size that a compressed message can inflate to
+     * if a message has to be decompressed.
+     *
+     * @param maxInflatedDataSize
+     */
+    public void setMaxInflatedDataSize(int maxInflatedDataSize) {
+        this.maxInflatedDataSize = maxInflatedDataSize;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 6aaa317fe4..e98156fa48 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -932,4 +932,9 @@ public abstract class BaseDestination implements 
Destination {
     public void setMessageInterceptorStrategy(MessageInterceptorStrategy 
messageInterceptorStrategy) {
         this.messageInterceptorStrategy = messageInterceptorStrategy;
     }
+
+    @Override
+    public int getMaxInflatedDataSize() {
+        return brokerService.getMaxInflatedDataSize();
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 1ab96560ac..154f013d6c 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -439,6 +439,11 @@ public class DestinationFilter implements Destination {
         }
     }
 
+    @Override
+    public int getMaxInflatedDataSize() {
+        return next.getMaxInflatedDataSize();
+    }
+
     public Destination getNext() {
         return next;
     }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index c2fe20ec29..3d9bec69bb 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -145,6 +145,9 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
     private boolean optimizedMessageDispatch = true;
     private boolean copyMessageOnSend = true;
     private boolean useCompression;
+    private double maxInflatedDataSizeRatio = 
ActiveMQConnectionFactory.DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO;
+    // This will be configured during negotiation if maxFrameSize has been 
configured.
+    private int maxInflatedDataSize = Integer.MAX_VALUE;
     private boolean objectMessageSerializationDefered;
     private boolean useAsyncSend;
     private boolean optimizeAcknowledge;
@@ -2006,6 +2009,15 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
         if(tmpMaxFrameSize > 0) {
             maxFrameSize.set(tmpMaxFrameSize);
         }
+
+        // Compute the maxInflatedData size as a ratio of maxFrameSize
+        // This prevents overflow and sets to Integer.MAX_VALUE if too large
+        double updatedMaxInflated = (double)tmpMaxFrameSize * 
maxInflatedDataSizeRatio;
+        if (Double.isInfinite(updatedMaxInflated) || updatedMaxInflated > 
Integer.MAX_VALUE) {
+            this.maxInflatedDataSize = Integer.MAX_VALUE;
+        } else {
+            this.maxInflatedDataSize = (int) updatedMaxInflated;
+        }
     }
 
     /**
@@ -2200,6 +2212,18 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
         this.useCompression = useCompression;
     }
 
+    public int getMaxInflatedDataSize() {
+        return maxInflatedDataSize;
+    }
+
+    public double getMaxInflatedDataSizeRatio() {
+        return maxInflatedDataSizeRatio;
+    }
+
+    public void setMaxInflatedDataSizeRatio(double maxInflatedDataSizeRatio) {
+        this.maxInflatedDataSizeRatio = maxInflatedDataSizeRatio;
+    }
+
     public void destroyDestination(ActiveMQDestination destination) throws 
JMSException {
 
         checkClosedOrFailed();
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index ae57d2624b..6c4bb81104 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -99,6 +99,9 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
     public static final String DEFAULT_USER = null;
     public static final String DEFAULT_PASSWORD = null;
     public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
+    // The default ratio for maxInflatedDataSize. The default is 10x the size
+    // of maxFrameSize
+    public static final double DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO = 10.0;
 
     protected URI brokerURL;
     protected String userName;
@@ -129,6 +132,7 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
     private long optimizedAckScheduledAckInterval = 0;
     private boolean copyMessageOnSend = true;
     private boolean useCompression;
+    private double maxInflatedDataSizeRatio = 
DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO;
     private boolean objectMessageSerializationDefered;
     private boolean useAsyncSend;
     private boolean optimizeAcknowledge;
@@ -417,6 +421,7 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
         connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
         connection.setCopyMessageOnSend(isCopyMessageOnSend());
         connection.setUseCompression(isUseCompression());
+        connection.setMaxInflatedDataSizeRatio(getMaxInflatedDataSizeRatio());
         
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
         connection.setDispatchAsync(isDispatchAsync());
         connection.setUseAsyncSend(isUseAsyncSend());
@@ -850,6 +855,7 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
 
         props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
         props.setProperty("useCompression", 
Boolean.toString(isUseCompression()));
+        props.setProperty("maxInflatedDataSizeRatio", 
Double.toString(getMaxInflatedDataSizeRatio()));
         props.setProperty("useRetroactiveConsumer", 
Boolean.toString(isUseRetroactiveConsumer()));
         props.setProperty("watchTopicAdvisories", 
Boolean.toString(isWatchTopicAdvisories()));
 
@@ -890,6 +896,21 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
         this.useCompression = useCompression;
     }
 
+    public double getMaxInflatedDataSizeRatio() {
+        return maxInflatedDataSizeRatio;
+    }
+
+    /**
+     * Set the ratio to use to compute maxInflatedDataSize which controls
+     * how large a decompressed message buffer can be. maxInflatedDataSize
+     * is computed as maxFrameSize * maxInflatedDataSizeRatio.
+     *
+     * @param maxInflatedDataSizeRatio
+     */
+    public void setMaxInflatedDataSizeRatio(double maxInflatedDataSizeRatio) {
+        this.maxInflatedDataSizeRatio = maxInflatedDataSizeRatio;
+    }
+
     public boolean isObjectMessageSerializationDefered() {
         return objectMessageSerializationDefered;
     }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 98517f1b39..822d764365 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -39,6 +39,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 
 /**
  * A <CODE>BytesMessage</CODE> object is used to send a message containing a
@@ -901,11 +902,15 @@ public class ActiveMQBytesMessage extends ActiveMQMessage 
implements BytesMessag
         ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
         try {
             length = ByteSequenceData.readIntBig(dataSequence);
+            // verify the length of the buffer is not larger than 
maxInflatedDataSize
+            
MarshallingSupport.validateMaxInflatedDataSize(getMaxInflatedDataSize(), 
length);
             dataSequence.offset = 0;
-            byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, 
dataSequence.getLength());
-            inflater.setInput(data);
+            inflater.setInput(dataSequence.getData(), 4, 
dataSequence.getLength() - 4);
             byte[] buffer = new byte[length];
             int count = inflater.inflate(buffer);
+            if (count != length) {
+                throw new IllegalStateException("Inflated buffer size is 
different than expected size of " + length);
+            }
             decompressed.write(buffer, 0, count);
             return decompressed.toByteArray();
         } catch (Exception e) {
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index 140431573f..6f5473f659 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -189,7 +189,8 @@ public class ActiveMQMapMessage extends ActiveMQMessage 
implements MapMessage {
             if (content != null) {
                 InputStream is = new ByteArrayInputStream(content);
                 if (isCompressed()) {
-                    is = MarshallingSupport.createInflaterInputStream(is);
+                    // wrap the stream so we don't inflate past 
maxInflatedDataSize
+                    is = 
MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
index 79cbf4c0d5..395dcca554 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
@@ -38,6 +38,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
@@ -208,7 +209,8 @@ public class ActiveMQObjectMessage extends ActiveMQMessage 
implements ObjectMess
             try {
                 InputStream is = new ByteArrayInputStream(content);
                 if (isCompressed()) {
-                    is = new InflaterInputStream(is);
+                    // wrap the stream so we don't inflate past 
maxInflatedDataSize
+                    is = 
MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 ClassLoadingAwareObjectInputStream objIn = new 
ClassLoadingAwareObjectInputStream(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index 1cd0d70e97..19640dad4e 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -833,6 +833,8 @@ public class ActiveMQStreamMessage extends ActiveMQMessage 
implements StreamMess
             }
             if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
                 int len = this.dataIn.readInt();
+                // verify that there are enough bytes remaining before 
allocation
+                MarshallingSupport.validateBufferSizeRemaining(dataIn, len);
                 byte[] value = new byte[len];
                 this.dataIn.readFully(value);
                 return value;
@@ -1165,10 +1167,15 @@ public class ActiveMQStreamMessage extends 
ActiveMQMessage implements StreamMess
                 if (compressed) {
                     ByteArrayInputStream input = new 
ByteArrayInputStream(this.content.getData(), this.content.getOffset(), 
this.content.getLength());
                     InflaterInputStream inflater = new 
InflaterInputStream(input);
+                    int total = 0;
                     try {
                         byte[] buffer = new byte[8*1024];
                         int read = 0;
                         while ((read = inflater.read(buffer)) != -1) {
+                            total = Math.addExact(total, read);
+                            // each time through the loop see if we are >= max 
inflated size so we stop
+                            // by doing this here we might go slightly pass 
the limit (up to 8 KB) but that is fine
+                            
MarshallingSupport.validateMaxInflatedDataSize(getMaxInflatedDataSize(), total);
                             this.dataOut.write(buffer, 0, read);
                         }
                     } finally {
@@ -1203,7 +1210,10 @@ public class ActiveMQStreamMessage extends 
ActiveMQMessage implements StreamMess
             if (isCompressed()) {
                 is = new InflaterInputStream(is);
                 is = new BufferedInputStream(is);
-                is = 
MarshallingSupport.createFrameLimitedInputStream(Integer.MAX_VALUE, is);
+                // Wrap the buffered stream in a frame limited stream so we 
can error if we exceed
+                // max inflate size
+                is = 
MarshallingSupport.createFrameLimitedInputStream(getMaxInflatedDataSize(), is);
+
             }
             this.dataIn = new DataInputStream(is);
         }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 5e73c3314b..a5df4164f0 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -95,7 +95,8 @@ public class ActiveMQTextMessage extends ActiveMQMessage 
implements TextMessage
             try {
                 is = new ByteArrayInputStream(bodyAsBytes);
                 if (isCompressed()) {
-                    is = MarshallingSupport.createInflaterInputStream(is);
+                    // wrap the stream so we don't inflate past 
maxInflatedDataSize
+                    is = 
MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 text = MarshallingSupport.readUTF8(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/Message.java 
b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index 2a31047c9e..06e0aebeea 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -33,6 +33,7 @@ import jakarta.jms.JMSException;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -102,9 +103,10 @@ public abstract class Message extends BaseCommand 
implements MarshallAware, Mess
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
 
-    public static interface MessageDestination {
+    public interface MessageDestination {
         int getMinimumMessageSize();
         MemoryUsage getMemoryUsage();
+        int getMaxInflatedDataSize();
     }
 
     public abstract Message copy();
@@ -871,4 +873,15 @@ public abstract class Message extends BaseCommand 
implements MarshallAware, Mess
         }
         return this;
     }
+
+    public int getMaxInflatedDataSize() {
+        // If this is set then this is on a broker
+        if (regionDestination != null) {
+            return regionDestination.getMaxInflatedDataSize();
+            // connection is set on Clients
+        } else if (connection != null) {
+            return connection.getMaxInflatedDataSize();
+        }
+        return OpenWireFormat.DEFAULT_MAX_INFLATED_DATA_SIZE;
+    }
 }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index 86171dc288..a927ed8f4c 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -43,6 +43,7 @@ public final class OpenWireFormat implements WireFormat {
     public static final int DEFAULT_WIRE_VERSION = 
CommandTypes.PROTOCOL_VERSION;
     public static final int DEFAULT_LEGACY_VERSION = 
CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
+    public static final int DEFAULT_MAX_INFLATED_DATA_SIZE = 1024 * 1024 * 100;
 
     static final byte NULL_TYPE = CommandTypes.NULL;
     private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
index c7bc76f02f..43eed3aa71 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
@@ -60,15 +60,27 @@ public final class MarshallingSupport {
 
     private MarshallingSupport() {}
 
-    // TODO: This will be limited in a future PR to something besides 
Integer.MAX_VALUE
-    public static InputStream createInflaterInputStream(InputStream is) {
-        return createFrameLimitedInputStream(Integer.MAX_VALUE, new 
InflaterInputStream(is));
+    public static InputStream createInflaterInputStream(int maxAvailable, 
InputStream is) {
+        return createFrameLimitedInputStream(maxAvailable, new 
InflaterInputStream(is));
     }
 
     public static InputStream createFrameLimitedInputStream(int maxAvailable, 
InputStream is) {
         return new FrameSizeLimitedFilterInputStream(maxAvailable, is);
     }
 
+    // Validate that the size value is not greater than the max available size
+    public static void validateMaxInflatedDataSize(int maxAvailable, int size) 
throws IOException {
+        if (size > maxAvailable) {
+            throw new MaxInflatedDataSizeExceededException(
+                    "Cannot read more than the uncompressed size bytes: 
requested " + size);
+        }
+    }
+
+    // Validate the size value is not greater than the remaining bytes in the 
stream
+    public static void validateBufferSizeRemaining(DataInputStream stream, int 
size) throws IOException {
+        validateBufferSize(stream, Integer.MAX_VALUE, size);
+    }
+
     public static void marshalPrimitiveMap(Map<String, Object> map, 
DataOutputStream out) throws IOException {
         if (map == null) {
             out.writeInt(-1);
@@ -486,4 +498,10 @@ public final class MarshallingSupport {
         }
     }
 
+    public static class MaxInflatedDataSizeExceededException extends 
ActiveMQUnmarshalEOFException {
+        public MaxInflatedDataSizeExceededException(String message) {
+            super(message);
+        }
+    }
+
 }
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index badd991217..0bae2e8b82 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -60,6 +60,7 @@ import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.MarshallingSupport;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.QoS;
@@ -636,8 +637,12 @@ public class MQTTProtocolConverter {
                     inflater.setInput(byteSequence.data, byteSequence.offset, 
byteSequence.length);
                     byte[] data = new byte[4096];
                     int read;
+                    int total = 0;
                     ByteArrayOutputStream bytesOut = new 
ByteArrayOutputStream();
                     while ((read = inflater.inflate(data)) != 0) {
+                        total = Math.addExact(total, read);
+                        // check if we have exceeded maxInflatedSize before 
continuing
+                        
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(),
 total);
                         bytesOut.write(data, 0, read);
                     }
                     byteSequence = bytesOut.toByteSequence();
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 7868cd4ce5..72974a159f 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1186,6 +1186,53 @@ public class MQTTTest extends MQTTTestSupport {
                 .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
     }
 
+    @Test
+    public void testMaxInflatedDataSizeErrorBytes() throws Exception {
+        testMaxInflatedDataSizeError(true);
+    }
+
+    @Test
+    public void testMaxInflatedDataSizeErrorText() throws Exception {
+        testMaxInflatedDataSizeError(false);
+    }
+
+    private void testMaxInflatedDataSizeError(boolean bytes) throws Exception {
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+
+        brokerService.setMaxInflatedDataSize(10);
+        String destinationName = "foo.far";
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
cf.createConnection();
+        activeMQConnection.setUseCompression(true);
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        jakarta.jms.Topic jmsTopic = s.createTopic(destinationName);
+        MessageProducer producer = s.createProducer(jmsTopic);
+
+        provider.subscribe("foo/+", AT_MOST_ONCE);
+        ActiveMQMessage sendMessage;
+        if (bytes) {
+            BytesMessage bytesMessage = s.createBytesMessage();
+            bytesMessage.writeBytes("bodybodybodybodybody".getBytes());
+            sendMessage = (ActiveMQMessage) bytesMessage;
+        } else {
+            sendMessage = (ActiveMQMessage) 
s.createTextMessage("bodybodybodybodybody");
+        }
+        // marshal and clear so the broker will have to decompress
+        sendMessage.storeContentAndClear();
+        producer.send(sendMessage);
+
+        byte[] message = provider.receive(1000);
+        assertNull("Should not get message", message);
+
+        provider.disconnect();
+        activeMQConnection.close();
+
+        // verify message is gone off the dest
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(new 
ActiveMQTopic(destinationName))
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+    }
+
     @Test(timeout = 60 * 1000)
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
         MQTT mqtt = createMQTTConnection();
diff --git 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index 39469befec..7d811f105f 100644
--- 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -375,6 +375,58 @@ public class StompTest extends StompTestSupport {
         assertTrue(sentToDlq.get());
     }
 
+    @Test(timeout = 60000)
+    public void testMaxInflatedDataSizeErrorDlqText() throws Exception {
+        testMaxInflatedDataSizeErrorDlq(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testMaxInflatedDataSizeErrorDlqBytes() throws Exception {
+        testMaxInflatedDataSizeErrorDlq(true);
+    }
+
+    private void testMaxInflatedDataSizeErrorDlq(boolean bytes) throws 
Exception {
+        String body = "testtesttesttesttesttest";
+
+        // set a tiny max size to trigger an error on dispatch
+        brokerService.setMaxInflatedDataSize(10);
+        ((ActiveMQConnection)connection).setUseCompression(true);
+        MessageProducer producer = session.createProducer(queue);
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" 
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" 
+ "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        // marshal and clear so the broker will have to decompress
+        ActiveMQMessage m;
+        if (bytes) {
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(body.getBytes());
+            m = (ActiveMQMessage) bytesMessage;
+        } else {
+            m = (ActiveMQMessage) session.createTextMessage(body);
+        }
+        m.storeContentAndClear();
+        producer.send(m);
+
+        // Message should be DLQ'd because it exceeds max inflated data size
+        try {
+            StompFrame frameNull = stompConnection.receive(500);
+            if (frameNull != null) {
+                fail("Should not have received any messages");
+            }
+        } catch (SocketTimeoutException soe) {}
+
+        // verify message is gone off the dest and went to the DLQ
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+        assertTrue(sentToDlq.get());
+    }
+
     @Test(timeout = 60000)
     public void testJMSXGroupIdCanBeSet() throws Exception {
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
index 2ace923058..4b8d54bb76 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
@@ -16,12 +16,19 @@
  */
 package org.apache.activemq.command;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import jakarta.jms.JMSException;
 import jakarta.jms.MessageFormatException;
 import jakarta.jms.MessageNotReadableException;
 import jakarta.jms.MessageNotWriteableException;
 
 import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.ExceptionUtils;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 
 /**
  * 
@@ -509,4 +516,28 @@ public class ActiveMQBytesMessageTest extends TestCase {
         } catch (MessageNotReadableException e) {
         }
     }
+
+    public void testCompressedUnmarshalException() throws Exception {
+        ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(true);
+
+        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+        msg.setConnection(connection);
+        msg.writeDouble(3.3d);
+
+        // store and reset for reading
+        msg.reset();
+        assertTrue(msg.isCompressed());
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.content, 100000);
+
+        try {
+            msg.readDouble();
+            fail("Should have thrown exception");
+        } catch (JMSException e) {
+            // expected
+            assertTrue(ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
+        }
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
index 4eb5234b69..4f1662b93c 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
@@ -477,7 +477,18 @@ public class ActiveMQMapMessageTest {
 
     @Test
     public void testUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    @Test
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    // For map messages both compressed and uncompressed need to be 
unmarshalled
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQMapMessage msg = new ActiveMQMapMessage();
         msg.setConnection(connection);
@@ -486,6 +497,7 @@ public class ActiveMQMapMessageTest {
         // store and marshal
         msg.storeContentAndClear();
         assertTrue(msg.map.isEmpty());
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000);
@@ -500,5 +512,4 @@ public class ActiveMQMapMessageTest {
                     ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
         }
     }
-
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
index f898acd50e..0058c34871 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.command;
 
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
@@ -28,6 +29,7 @@ import jakarta.jms.MessageNotWriteableException;
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.util.ByteSequenceData;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 import org.apache.commons.lang.exception.ExceptionUtils;
 
 /**
@@ -129,8 +131,17 @@ public class ActiveMQObjectMessageTest extends TestCase {
         }
     }
 
-    public void testUnCompressedException() throws Exception {
+    public void testUnCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
         msg.setConnection(connection);
@@ -139,6 +150,7 @@ public class ActiveMQObjectMessageTest extends TestCase {
         // store and marshal
         msg.storeContentAndClear();
         assertNull(msg.object);
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000);
@@ -150,6 +162,12 @@ public class ActiveMQObjectMessageTest extends TestCase {
         } catch (JMSException e) {
             // uncompressed will have an error from the JDK deserialization
             assertTrue(ExceptionUtils.getRootCause(e) instanceof IOException);
+
+            // our validation causes BufferUnmarshalException for a compressed 
stream
+            if (compressed) {
+                // expected
+                assertTrue(ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
+            }
         }
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
index 1aa4eedeaf..34b94953e6 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
@@ -18,10 +18,12 @@ package org.apache.activemq.command;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import jakarta.jms.JMSException;
 import jakarta.jms.MessageEOFException;
@@ -31,8 +33,7 @@ import jakarta.jms.MessageNotWriteableException;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.util.ByteSequenceData;
-import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.activemq.util.ExceptionUtils;
 import org.junit.Test;
 
 /**
@@ -1081,7 +1082,17 @@ public class ActiveMQStreamMessageTest {
 
     @Test
     public void testUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    @Test
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
         msg.setConnection(connection);
@@ -1090,6 +1101,7 @@ public class ActiveMQStreamMessageTest {
         // store and marshal
         msg.reset();
         assertNull(msg.dataOut);
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000000);
@@ -1098,8 +1110,8 @@ public class ActiveMQStreamMessageTest {
             msg.readBytes(new byte[1024]);
             fail("Should have thrown exception");
         } catch (JMSException e) {
-            // expected
-            assertTrue(e instanceof MessageFormatException);
+            // if this is not null then there was an expected format exception
+            assertNotNull(ExceptionUtils.createMessageFormatException(e));
         }
     }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
index f2825f0ba4..ccdea19533 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.command;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.beans.Transient;
 import java.io.DataOutputStream;
@@ -162,8 +163,19 @@ public class ActiveMQTextMessageTest extends TestCase {
         assertTrue(method.isAnnotationPresent(Transient.class));
     }
 
-    public void testUnUnmarshalException() throws Exception {
+
+    public void testUnCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    // For text messages both compressed and uncompressed need to be 
unmarshalled
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQTextMessage msg = new ActiveMQTextMessage();
         msg.setConnection(connection);
@@ -172,6 +184,7 @@ public class ActiveMQTextMessageTest extends TestCase {
         // store and marshal
         msg.storeContentAndClear();
         assertNull(msg.text);
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index 5ba84df4d6..2799556994 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.command;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.UnsupportedEncodingException;
 
 import jakarta.jms.BytesMessage;
@@ -24,13 +30,27 @@ import jakarta.jms.MessageConsumer;
 import jakarta.jms.MessageProducer;
 import jakarta.jms.Session;
 
-import junit.framework.TestCase;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
-
-public class MessageCompressionTest extends TestCase {
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import 
org.apache.activemq.util.MarshallingSupport.MaxInflatedDataSizeExceededException;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageCompressionTest {
 
     private static final String BROKER_URL = "tcp://localhost:0";
     // The following text should compress well
@@ -47,20 +67,58 @@ public class MessageCompressionTest extends TestCase {
     private BrokerService broker;
     private ActiveMQQueue queue;
     private String connectionUri;
+    private final AtomicBoolean throwMaxInflatedException = new 
AtomicBoolean(false);
+    private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+
+    @Before
+    public void setUp() throws Exception {
+        throwMaxInflatedException.set(false);
+        sentToDlq.set(false);
 
-    protected void setUp() throws Exception {
         broker = new BrokerService();
+        broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
+            @Override
+            public Broker installPlugin(Broker broker) {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public void preProcessDispatch(MessageDispatch 
messageDispatch) {
+                        super.preProcessDispatch(messageDispatch);
+                        // simulate a max inflated data size exception during 
protocol
+                        // conversion and decompression
+                        if (throwMaxInflatedException.get()) {
+                            try {
+                                throw new 
MaxInflatedDataSizeExceededException("Test");
+                            } catch (ActiveMQUnmarshalEOFException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    }
+
+                    @Override
+                    public boolean sendToDeadLetterQueue(ConnectionContext 
context,
+                            MessageReference messageReference, Subscription 
subscription,
+                            Throwable poisonCause) {
+                        sentToDlq.set(true);
+                        return super.sendToDeadLetterQueue(context, 
messageReference,
+                                subscription, poisonCause);
+                    }
+                };
+            }
+        }});
+
         connectionUri = 
broker.addConnector(BROKER_URL).getPublishableConnectString();
         broker.start();
         queue = new ActiveMQQueue("TEST." + System.currentTimeMillis());
     }
 
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (broker != null) {
             broker.stop();
         }
     }
 
+    @Test
     public void testTextMessageCompression() throws Exception {
 
         ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
@@ -79,6 +137,7 @@ public class MessageCompressionTest extends TestCase {
                 compressedSize < unCompressedSize);
     }
 
+    @Test
     public void testBytesMessageCompression() throws Exception {
 
         ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
@@ -86,9 +145,9 @@ public class MessageCompressionTest extends TestCase {
         sendTestBytesMessage(factory, TEXT);
         ActiveMQBytesMessage message = receiveTestBytesMessage(factory);
         int compressedSize = message.getContent().getLength();
-        byte[] bytes = new byte[TEXT.getBytes("UTF8").length];
+        byte[] bytes = new byte[TEXT.getBytes(StandardCharsets.UTF_8).length];
         message.readBytes(bytes);
-        assertTrue(message.readBytes(new byte[255]) == -1);
+        assertEquals(-1, message.readBytes(new byte[255]));
         String rcvString = new String(bytes, "UTF8");
         assertEquals(TEXT, rcvString);
         assertTrue(message.isCompressed());
@@ -100,7 +159,55 @@ public class MessageCompressionTest extends TestCase {
         int unCompressedSize = message.getContent().getLength();
 
         assertTrue("expected: compressed Size '" + compressedSize + "' < 
unCompressedSize '" + unCompressedSize + "'",
-                   compressedSize < unCompressedSize);
+                compressedSize < unCompressedSize);
+    }
+
+    // Test that an error during dispatch goes to the DLQ
+    @Test
+    public void testMaxInflatedSizeDlq() throws Exception {
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+        factory.setUseCompression(true);
+        ActiveMQConnection con1 = (ActiveMQConnection) 
factory.createConnection();
+        con1.start();
+
+        Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session1.createProducer(queue);
+        ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) 
session1.createBytesMessage();
+        bytesMessage.writeBytes(TEXT.getBytes(StandardCharsets.UTF_8));
+        producer.send(bytesMessage);
+
+        assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 1, 
1000, 10));
+        assertFalse(sentToDlq.get());
+
+        // simulate a decompression error
+        // this should poison ack and DLQ and we shouldn't get the message
+        // but the connection should still be open
+        this.throwMaxInflatedException.set(true);
+
+        ActiveMQConnection con2 = (ActiveMQConnection) 
factory.createConnection();
+        con2.start();
+        Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(queue);
+        assertNull(consumer.receive(1000));
+
+        // verify message is gone off the dest and went to the DLQ
+        assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+        assertTrue(sentToDlq.get());
+
+        // no longer throw an exception
+        this.throwMaxInflatedException.set(false);
+        sentToDlq.set(false);
+
+        // exception has been disabled so we should receive again on the same 
connection
+        producer.send(bytesMessage);
+        assertNotNull(consumer.receive(1000));
+        assertFalse(sentToDlq.get());
+
+        con1.close();
+        con2.close();
     }
 
     private void sendTestMessage(ActiveMQConnectionFactory factory, String 
message) throws JMSException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to