This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new 0622d21531 Add support for maxInflateDataSize to limit uncompressed 
buffers (#2139) (#2148)
0622d21531 is described below

commit 0622d21531a0788cae92b832dcde729635077e51
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue Jun 23 13:43:42 2026 -0400

    Add support for maxInflateDataSize to limit uncompressed buffers (#2139) 
(#2148)
    
    This change adds support for limiting the maximum inflation size of a
    message body when a message needs to be decompressed. This new setting
    will help prevent OOM errors from large buffers being allocated.
    
    The primary concern this is addressing is that compressed messages may
    be smaller than maxFrameSize and the broker will accept them, but if an
    event triggers a decompression a huge buffer could be created and cause
    OOM.
    
    The broker will have a new maxInflateDataSize config that is broker wide
    because te value isn't tied to a protocol and it will default to 100 MB.
    Clients are tied to a transport, so it makes more sense to make it as a
    ratio of maxFrameSize. The default is 10x maxFrameSize if configured
    which should be enough of a buffer under normal circumstances. The ratio
    can be changed using maxInflatedDataSizeRatio on a connection.
    
    (cherry picked from commit 94a40565f146fa3e134085637f4f676f9cc6e16b)
---
 .../transport/amqp/message/AmqpMessageSupport.java |  16 ++-
 .../transport/amqp/JMSInteroperabilityTest.java    |  36 ++++++
 .../message/JMSMappingOutboundTransformerTest.java |  26 ++++-
 .../org/apache/activemq/broker/BrokerService.java  |  14 +++
 .../activemq/broker/region/BaseDestination.java    |   5 +
 .../activemq/broker/region/DestinationFilter.java  |   5 +
 .../org/apache/activemq/ActiveMQConnection.java    |  24 ++++
 .../apache/activemq/ActiveMQConnectionFactory.java |  21 ++++
 .../activemq/command/ActiveMQBytesMessage.java     |   9 +-
 .../activemq/command/ActiveMQMapMessage.java       |   3 +-
 .../activemq/command/ActiveMQObjectMessage.java    |   4 +-
 .../activemq/command/ActiveMQStreamMessage.java    |  12 +-
 .../activemq/command/ActiveMQTextMessage.java      |   3 +-
 .../java/org/apache/activemq/command/Message.java  |  15 ++-
 .../apache/activemq/openwire/OpenWireFormat.java   |   1 +
 .../apache/activemq/util/MarshallingSupport.java   |  24 +++-
 .../transport/mqtt/MQTTProtocolConverter.java      |   5 +
 .../apache/activemq/transport/mqtt/MQTTTest.java   |  47 ++++++++
 .../apache/activemq/transport/stomp/StompTest.java |  52 +++++++++
 .../activemq/command/ActiveMQBytesMessageTest.java |  31 ++++++
 .../activemq/command/ActiveMQMapMessageTest.java   |  13 ++-
 .../command/ActiveMQObjectMessageTest.java         |  20 +++-
 .../command/ActiveMQStreamMessageTest.java         |  20 +++-
 .../activemq/command/ActiveMQTextMessageTest.java  |  15 ++-
 .../activemq/command/MessageCompressionTest.java   | 123 +++++++++++++++++++--
 25 files changed, 516 insertions(+), 28 deletions(-)

diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
index 86151a1aa8..c454b79af7 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp.message;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
@@ -38,6 +39,7 @@ import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -213,6 +215,12 @@ public final class AmqpMessageSupport {
 
             if (message.isCompressed()) {
                 int length = (int) message.getBodyLength();
+                // before we allocate the buffer ensure it's not too large
+                try {
+                    
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(),
 length);
+                } catch (IOException cause) {
+                    throw JMSExceptionSupport.create(cause);
+                }
                 byte[] uncompressed = new byte[length];
                 message.readBytes(uncompressed);
 
@@ -244,7 +252,9 @@ public final class AmqpMessageSupport {
             if (message.isCompressed()) {
                 try (ByteArrayOutputStream os = new ByteArrayOutputStream();
                      ByteArrayInputStream is = new 
ByteArrayInputStream(contents);
-                     InflaterInputStream iis = new InflaterInputStream(is);) {
+                     // wrap to prevent allocating more than 
maxInflatedDataSize
+                     InputStream iis = 
MarshallingSupport.createInflaterInputStream(
+                            message.getMaxInflatedDataSize(), is)) {
 
                     byte value;
                     while ((value = (byte) iis.read()) != -1) {
@@ -282,10 +292,14 @@ public final class AmqpMessageSupport {
 
             if (message.isCompressed()) {
                 try (ByteArrayInputStream is = new 
ByteArrayInputStream(contents);
+                     // We do not need to wrap this stream, the size is 
validated below
+                     // before allocation
                      InflaterInputStream iis = new InflaterInputStream(is);
                      DataInputStream dis = new DataInputStream(iis);) {
 
                     int size = dis.readInt();
+                    // before we allocate the buffer ensure it's not too large
+                    
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(),
 size);
                     byte[] uncompressed = new byte[size];
                     dis.readFully(uncompressed);
 
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
index 0e3ab93dee..c7e4c5d29d 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -519,6 +519,42 @@ public class JMSInteroperabilityTest extends 
JMSClientTestSupport {
 
     }
 
+    @Test
+    public void testOpenWireToQpidCompressionFailure() throws Exception {
+
+        // Raw Transformer doesn't expand message properties.
+        assumeFalse(!transformer.equals("jms"));
+
+        // set to 512 bytes
+        brokerService.setMaxInflatedDataSize(512);
+
+        try (Connection openwire = createJMSConnection(); Connection amqp = 
createConnection()) {
+            ((ActiveMQConnection) openwire).setUseCompression(true);
+            openwire.start();
+            amqp.start();
+
+            Session openwireSession = openwire.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Session amqpSession = amqp.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            Destination queue = 
openwireSession.createQueue(getDestinationName());
+            MessageProducer openwireProducer = 
openwireSession.createProducer(queue);
+            MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+            StringBuilder builder = new StringBuilder();
+            // generate a string longer than 512 bytes
+            for (int i = 1; i <= 50; i++) {
+                builder.append("compresedpayload");
+            }
+            // Create and send the Message
+            
openwireProducer.send(openwireSession.createTextMessage(builder.toString()));
+
+            // There should be an error triggered on dispatch during 
decompression
+            // and message should go to the DLQ
+            assertNull(amqpConsumer.receive(1000));
+            assertTrue(sentToDlq.get());
+        }
+    }
+
     // The following tests for corruption will corrupt the headers or body
     // to test that the AMQP protocol correctly passes the error during
     // dispatch to allow the Transport Connection to properly handle
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
index 1d3adea1bf..afb9548a75 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -754,8 +755,23 @@ public class JMSMappingOutboundTransformerTest {
 
     @Test
     public void testConvertCompressedTextMessageCreatesDataSectionBody() 
throws Exception {
+        ActiveMQTextMessage outbound = 
createTextMessage("myTextMessageContent", true);
+        ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
+        Mockito.when(connection.isUseCompression()).thenReturn(true);
+        // override to trigger an error with decompression
+        Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(10);
+        outbound.setConnection(connection);
+        assertThrows(JMSException.class, () -> 
testConvertCompressedTextMessageCreatesDataSectionBody(outbound));
+    }
+
+    @Test
+    public void 
testConvertCompressedTextMessageCreatesDataSectionBodyMaxInflatedSize() throws 
Exception {
+        ActiveMQTextMessage outbound = 
createTextMessage("myTextMessageContent", true);
+        testConvertCompressedTextMessageCreatesDataSectionBody(outbound);
+    }
+
+    private void 
testConvertCompressedTextMessageCreatesDataSectionBody(ActiveMQTextMessage 
outbound) throws Exception {
         String contentString = "myTextMessageContent";
-        ActiveMQTextMessage outbound = createTextMessage(contentString, true);
         outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
         outbound.onSend();
         outbound.storeContent();
@@ -775,7 +791,8 @@ public class JMSMappingOutboundTransformerTest {
         String contents = new String(data.getArray(), data.getArrayOffset(), 
data.getLength(), StandardCharsets.UTF_8);
         assertEquals(contentString, contents);
     }
-    
+
+
     @Test 
     public void testConvertConnectionInfo() throws Exception {
        String connectionId = "myConnectionId";
@@ -1014,6 +1031,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             message.setConnection(connection);
         }
 
@@ -1030,6 +1048,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             message.setConnection(connection);
         }
 
@@ -1046,6 +1065,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             message.setConnection(connection);
         }
 
@@ -1066,6 +1086,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             result.setConnection(connection);
         }
 
@@ -1100,6 +1121,7 @@ public class JMSMappingOutboundTransformerTest {
         if (compression) {
             ActiveMQConnection connection = 
Mockito.mock(ActiveMQConnection.class);
             Mockito.when(connection.isUseCompression()).thenReturn(true);
+            
Mockito.when(connection.getMaxInflatedDataSize()).thenReturn(Integer.MAX_VALUE);
             result.setConnection(connection);
         }
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index b3e452c498..0bbb3f7197 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -267,6 +267,7 @@ public class BrokerService implements Service {
     private final List<Runnable> preShutdownHooks = new 
CopyOnWriteArrayList<>();
 
     private int maxUncommittedCount = DEFAULT_MAX_UNCOMMITTED_COUNT;
+    private int maxInflatedDataSize = 
OpenWireFormat.DEFAULT_MAX_INFLATED_DATA_SIZE;
 
     static {
 
@@ -3337,4 +3338,17 @@ public class BrokerService implements Service {
         this.maxUncommittedCount = maxUncommittedCount;
     }
 
+    public int getMaxInflatedDataSize() {
+        return maxInflatedDataSize;
+    }
+
+    /**
+     * Set the maximum size that a compressed message can inflate to
+     * if a message has to be decompressed.
+     *
+     * @param maxInflatedDataSize
+     */
+    public void setMaxInflatedDataSize(int maxInflatedDataSize) {
+        this.maxInflatedDataSize = maxInflatedDataSize;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index f8ba76cca8..4c69522299 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -932,4 +932,9 @@ public abstract class BaseDestination implements 
Destination {
     public void setMessageInterceptorStrategy(MessageInterceptorStrategy 
messageInterceptorStrategy) {
         this.messageInterceptorStrategy = messageInterceptorStrategy;
     }
+
+    @Override
+    public int getMaxInflatedDataSize() {
+        return brokerService.getMaxInflatedDataSize();
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 1ab96560ac..154f013d6c 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -439,6 +439,11 @@ public class DestinationFilter implements Destination {
         }
     }
 
+    @Override
+    public int getMaxInflatedDataSize() {
+        return next.getMaxInflatedDataSize();
+    }
+
     public Destination getNext() {
         return next;
     }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index 8dde1b9c98..31f3cdedcb 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -145,6 +145,9 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
     private boolean optimizedMessageDispatch = true;
     private boolean copyMessageOnSend = true;
     private boolean useCompression;
+    private double maxInflatedDataSizeRatio = 
ActiveMQConnectionFactory.DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO;
+    // This will be configured during negotiation if maxFrameSize has been 
configured.
+    private int maxInflatedDataSize = Integer.MAX_VALUE;
     private boolean objectMessageSerializationDefered;
     private boolean useAsyncSend;
     private boolean optimizeAcknowledge;
@@ -1993,6 +1996,15 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
         if(tmpMaxFrameSize > 0) {
             maxFrameSize.set(tmpMaxFrameSize);
         }
+
+        // Compute the maxInflatedData size as a ratio of maxFrameSize
+        // This prevents overflow and sets to Integer.MAX_VALUE if too large
+        double updatedMaxInflated = (double)tmpMaxFrameSize * 
maxInflatedDataSizeRatio;
+        if (Double.isInfinite(updatedMaxInflated) || updatedMaxInflated > 
Integer.MAX_VALUE) {
+            this.maxInflatedDataSize = Integer.MAX_VALUE;
+        } else {
+            this.maxInflatedDataSize = (int) updatedMaxInflated;
+        }
     }
 
     /**
@@ -2187,6 +2199,18 @@ public class ActiveMQConnection implements Connection, 
TopicConnection, QueueCon
         this.useCompression = useCompression;
     }
 
+    public int getMaxInflatedDataSize() {
+        return maxInflatedDataSize;
+    }
+
+    public double getMaxInflatedDataSizeRatio() {
+        return maxInflatedDataSizeRatio;
+    }
+
+    public void setMaxInflatedDataSizeRatio(double maxInflatedDataSizeRatio) {
+        this.maxInflatedDataSizeRatio = maxInflatedDataSizeRatio;
+    }
+
     public void destroyDestination(ActiveMQDestination destination) throws 
JMSException {
 
         checkClosedOrFailed();
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 2324448bba..212265e5a8 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -119,6 +119,9 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
     public static final String DEFAULT_USER = null;
     public static final String DEFAULT_PASSWORD = null;
     public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
+    // The default ratio for maxInflatedDataSize. The default is 10x the size
+    // of maxFrameSize
+    public static final double DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO = 10.0;
 
     protected URI brokerURL;
     protected String userName;
@@ -149,6 +152,7 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
     private long optimizedAckScheduledAckInterval = 0;
     private boolean copyMessageOnSend = true;
     private boolean useCompression;
+    private double maxInflatedDataSizeRatio = 
DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO;
     private boolean objectMessageSerializationDefered;
     private boolean useAsyncSend;
     private boolean optimizeAcknowledge;
@@ -432,6 +436,7 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
         connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
         connection.setCopyMessageOnSend(isCopyMessageOnSend());
         connection.setUseCompression(isUseCompression());
+        connection.setMaxInflatedDataSizeRatio(getMaxInflatedDataSizeRatio());
         
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
         connection.setDispatchAsync(isDispatchAsync());
         connection.setUseAsyncSend(isUseAsyncSend());
@@ -864,6 +869,7 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
 
         props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
         props.setProperty("useCompression", 
Boolean.toString(isUseCompression()));
+        props.setProperty("maxInflatedDataSizeRatio", 
Double.toString(getMaxInflatedDataSizeRatio()));
         props.setProperty("useRetroactiveConsumer", 
Boolean.toString(isUseRetroactiveConsumer()));
         props.setProperty("watchTopicAdvisories", 
Boolean.toString(isWatchTopicAdvisories()));
 
@@ -904,6 +910,21 @@ public class ActiveMQConnectionFactory extends 
JNDIBaseStorable implements Conne
         this.useCompression = useCompression;
     }
 
+    public double getMaxInflatedDataSizeRatio() {
+        return maxInflatedDataSizeRatio;
+    }
+
+    /**
+     * Set the ratio to use to compute maxInflatedDataSize which controls
+     * how large a decompressed message buffer can be. maxInflatedDataSize
+     * is computed as maxFrameSize * maxInflatedDataSizeRatio.
+     *
+     * @param maxInflatedDataSizeRatio
+     */
+    public void setMaxInflatedDataSizeRatio(double maxInflatedDataSizeRatio) {
+        this.maxInflatedDataSizeRatio = maxInflatedDataSizeRatio;
+    }
+
     public boolean isObjectMessageSerializationDefered() {
         return objectMessageSerializationDefered;
     }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 2800050b62..f46ef1c60c 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -39,6 +39,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 
 /**
  * A <CODE>BytesMessage</CODE> object is used to send a message containing a
@@ -901,11 +902,15 @@ public class ActiveMQBytesMessage extends ActiveMQMessage 
implements BytesMessag
         ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
         try {
             length = ByteSequenceData.readIntBig(dataSequence);
+            // verify the length of the buffer is not larger than 
maxInflatedDataSize
+            
MarshallingSupport.validateMaxInflatedDataSize(getMaxInflatedDataSize(), 
length);
             dataSequence.offset = 0;
-            byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, 
dataSequence.getLength());
-            inflater.setInput(data);
+            inflater.setInput(dataSequence.getData(), 4, 
dataSequence.getLength() - 4);
             byte[] buffer = new byte[length];
             int count = inflater.inflate(buffer);
+            if (count != length) {
+                throw new IllegalStateException("Inflated buffer size is 
different than expected size of " + length);
+            }
             decompressed.write(buffer, 0, count);
             return decompressed.toByteArray();
         } catch (Exception e) {
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index 18ce65ede2..2fc44954bd 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -189,7 +189,8 @@ public class ActiveMQMapMessage extends ActiveMQMessage 
implements MapMessage {
             if (content != null) {
                 InputStream is = new ByteArrayInputStream(content);
                 if (isCompressed()) {
-                    is = MarshallingSupport.createInflaterInputStream(is);
+                    // wrap the stream so we don't inflate past 
maxInflatedDataSize
+                    is = 
MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
index 41b98b511c..839d69db46 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
@@ -38,6 +38,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
 import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
@@ -208,7 +209,8 @@ public class ActiveMQObjectMessage extends ActiveMQMessage 
implements ObjectMess
             try {
                 InputStream is = new ByteArrayInputStream(content);
                 if (isCompressed()) {
-                    is = new InflaterInputStream(is);
+                    // wrap the stream so we don't inflate past 
maxInflatedDataSize
+                    is = 
MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 ClassLoadingAwareObjectInputStream objIn = new 
ClassLoadingAwareObjectInputStream(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index 93afc01af7..d50d0d2c4b 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -833,6 +833,8 @@ public class ActiveMQStreamMessage extends ActiveMQMessage 
implements StreamMess
             }
             if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
                 int len = this.dataIn.readInt();
+                // verify that there are enough bytes remaining before 
allocation
+                MarshallingSupport.validateBufferSizeRemaining(dataIn, len);
                 byte[] value = new byte[len];
                 this.dataIn.readFully(value);
                 return value;
@@ -1165,10 +1167,15 @@ public class ActiveMQStreamMessage extends 
ActiveMQMessage implements StreamMess
                 if (compressed) {
                     ByteArrayInputStream input = new 
ByteArrayInputStream(this.content.getData(), this.content.getOffset(), 
this.content.getLength());
                     InflaterInputStream inflater = new 
InflaterInputStream(input);
+                    int total = 0;
                     try {
                         byte[] buffer = new byte[8*1024];
                         int read = 0;
                         while ((read = inflater.read(buffer)) != -1) {
+                            total = Math.addExact(total, read);
+                            // each time through the loop see if we are >= max 
inflated size so we stop
+                            // by doing this here we might go slightly pass 
the limit (up to 8 KB) but that is fine
+                            
MarshallingSupport.validateMaxInflatedDataSize(getMaxInflatedDataSize(), total);
                             this.dataOut.write(buffer, 0, read);
                         }
                     } finally {
@@ -1203,7 +1210,10 @@ public class ActiveMQStreamMessage extends 
ActiveMQMessage implements StreamMess
             if (isCompressed()) {
                 is = new InflaterInputStream(is);
                 is = new BufferedInputStream(is);
-                is = 
MarshallingSupport.createFrameLimitedInputStream(Integer.MAX_VALUE, is);
+                // Wrap the buffered stream in a frame limited stream so we 
can error if we exceed
+                // max inflate size
+                is = 
MarshallingSupport.createFrameLimitedInputStream(getMaxInflatedDataSize(), is);
+
             }
             this.dataIn = new DataInputStream(is);
         }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 41de1ca635..fe25d29e0c 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -95,7 +95,8 @@ public class ActiveMQTextMessage extends ActiveMQMessage 
implements TextMessage
             try {
                 is = new ByteArrayInputStream(bodyAsBytes);
                 if (isCompressed()) {
-                    is = MarshallingSupport.createInflaterInputStream(is);
+                    // wrap the stream so we don't inflate past 
maxInflatedDataSize
+                    is = 
MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
                 }
                 DataInputStream dataIn = new DataInputStream(is);
                 text = MarshallingSupport.readUTF8(dataIn);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/Message.java 
b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index 8516962548..2ce949566a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -33,6 +33,7 @@ import javax.jms.JMSException;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -102,9 +103,10 @@ public abstract class Message extends BaseCommand 
implements MarshallAware, Mess
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
 
-    public static interface MessageDestination {
+    public interface MessageDestination {
         int getMinimumMessageSize();
         MemoryUsage getMemoryUsage();
+        int getMaxInflatedDataSize();
     }
 
     public abstract Message copy();
@@ -871,4 +873,15 @@ public abstract class Message extends BaseCommand 
implements MarshallAware, Mess
         }
         return this;
     }
+
+    public int getMaxInflatedDataSize() {
+        // If this is set then this is on a broker
+        if (regionDestination != null) {
+            return regionDestination.getMaxInflatedDataSize();
+            // connection is set on Clients
+        } else if (connection != null) {
+            return connection.getMaxInflatedDataSize();
+        }
+        return OpenWireFormat.DEFAULT_MAX_INFLATED_DATA_SIZE;
+    }
 }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index 8914d3c96f..62373111b8 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -43,6 +43,7 @@ public final class OpenWireFormat implements WireFormat {
     public static final int DEFAULT_WIRE_VERSION = 
CommandTypes.PROTOCOL_VERSION;
     public static final int DEFAULT_LEGACY_VERSION = 
CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
+    public static final int DEFAULT_MAX_INFLATED_DATA_SIZE = 1024 * 1024 * 100;
 
     static final byte NULL_TYPE = CommandTypes.NULL;
     private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
index 61fa278561..29afc27087 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/util/MarshallingSupport.java
@@ -60,15 +60,27 @@ public final class MarshallingSupport {
 
     private MarshallingSupport() {}
 
-    // TODO: This will be limited in a future PR to something besides 
Integer.MAX_VALUE
-    public static InputStream createInflaterInputStream(InputStream is) {
-        return createFrameLimitedInputStream(Integer.MAX_VALUE, new 
InflaterInputStream(is));
+    public static InputStream createInflaterInputStream(int maxAvailable, 
InputStream is) {
+        return createFrameLimitedInputStream(maxAvailable, new 
InflaterInputStream(is));
     }
 
     public static InputStream createFrameLimitedInputStream(int maxAvailable, 
InputStream is) {
         return new FrameSizeLimitedFilterInputStream(maxAvailable, is);
     }
 
+    // Validate that the size value is not greater than the max available size
+    public static void validateMaxInflatedDataSize(int maxAvailable, int size) 
throws IOException {
+        if (size > maxAvailable) {
+            throw new MaxInflatedDataSizeExceededException(
+                    "Cannot read more than the uncompressed size bytes: 
requested " + size);
+        }
+    }
+
+    // Validate the size value is not greater than the remaining bytes in the 
stream
+    public static void validateBufferSizeRemaining(DataInputStream stream, int 
size) throws IOException {
+        validateBufferSize(stream, Integer.MAX_VALUE, size);
+    }
+
     public static void marshalPrimitiveMap(Map<String, Object> map, 
DataOutputStream out) throws IOException {
         if (map == null) {
             out.writeInt(-1);
@@ -486,4 +498,10 @@ public final class MarshallingSupport {
         }
     }
 
+    public static class MaxInflatedDataSizeExceededException extends 
ActiveMQUnmarshalEOFException {
+        public MaxInflatedDataSizeExceededException(String message) {
+            super(message);
+        }
+    }
+
 }
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index e6c6e16501..5472cc5d48 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -60,6 +60,7 @@ import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.MarshallingSupport;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.QoS;
@@ -636,8 +637,12 @@ public class MQTTProtocolConverter {
                     inflater.setInput(byteSequence.data, byteSequence.offset, 
byteSequence.length);
                     byte[] data = new byte[4096];
                     int read;
+                    int total = 0;
                     ByteArrayOutputStream bytesOut = new 
ByteArrayOutputStream();
                     while ((read = inflater.inflate(data)) != 0) {
+                        total = Math.addExact(total, read);
+                        // check if we have exceeded maxInflatedSize before 
continuing
+                        
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(),
 total);
                         bytesOut.write(data, 0, read);
                     }
                     byteSequence = bytesOut.toByteSequence();
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index be866faec5..b720be701b 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1186,6 +1186,53 @@ public class MQTTTest extends MQTTTestSupport {
                 .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
     }
 
+    @Test
+    public void testMaxInflatedDataSizeErrorBytes() throws Exception {
+        testMaxInflatedDataSizeError(true);
+    }
+
+    @Test
+    public void testMaxInflatedDataSizeErrorText() throws Exception {
+        testMaxInflatedDataSizeError(false);
+    }
+
+    private void testMaxInflatedDataSizeError(boolean bytes) throws Exception {
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+
+        brokerService.setMaxInflatedDataSize(10);
+        String destinationName = "foo.far";
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
cf.createConnection();
+        activeMQConnection.setUseCompression(true);
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        javax.jms.Topic jmsTopic = s.createTopic(destinationName);
+        MessageProducer producer = s.createProducer(jmsTopic);
+
+        provider.subscribe("foo/+", AT_MOST_ONCE);
+        ActiveMQMessage sendMessage;
+        if (bytes) {
+            BytesMessage bytesMessage = s.createBytesMessage();
+            bytesMessage.writeBytes("bodybodybodybodybody".getBytes());
+            sendMessage = (ActiveMQMessage) bytesMessage;
+        } else {
+            sendMessage = (ActiveMQMessage) 
s.createTextMessage("bodybodybodybodybody");
+        }
+        // marshal and clear so the broker will have to decompress
+        sendMessage.storeContentAndClear();
+        producer.send(sendMessage);
+
+        byte[] message = provider.receive(1000);
+        assertNull("Should not get message", message);
+
+        provider.disconnect();
+        activeMQConnection.close();
+
+        // verify message is gone off the dest
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(new 
ActiveMQTopic(destinationName))
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+    }
+
     @Test(timeout = 60 * 1000)
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
         MQTT mqtt = createMQTTConnection();
diff --git 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index f0612cf61b..5feb3cb960 100644
--- 
a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ 
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -375,6 +375,58 @@ public class StompTest extends StompTestSupport {
         assertTrue(sentToDlq.get());
     }
 
+    @Test(timeout = 60000)
+    public void testMaxInflatedDataSizeErrorDlqText() throws Exception {
+        testMaxInflatedDataSizeErrorDlq(false);
+    }
+
+    @Test(timeout = 60000)
+    public void testMaxInflatedDataSizeErrorDlqBytes() throws Exception {
+        testMaxInflatedDataSizeErrorDlq(true);
+    }
+
+    private void testMaxInflatedDataSizeErrorDlq(boolean bytes) throws 
Exception {
+        String body = "testtesttesttesttesttest";
+
+        // set a tiny max size to trigger an error on dispatch
+        brokerService.setMaxInflatedDataSize(10);
+        ((ActiveMQConnection)connection).setUseCompression(true);
+        MessageProducer producer = session.createProducer(queue);
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" 
+ Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" 
+ "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        // marshal and clear so the broker will have to decompress
+        ActiveMQMessage m;
+        if (bytes) {
+            BytesMessage bytesMessage = session.createBytesMessage();
+            bytesMessage.writeBytes(body.getBytes());
+            m = (ActiveMQMessage) bytesMessage;
+        } else {
+            m = (ActiveMQMessage) session.createTextMessage(body);
+        }
+        m.storeContentAndClear();
+        producer.send(m);
+
+        // Message should be DLQ'd because it exceeds max inflated data size
+        try {
+            StompFrame frameNull = stompConnection.receive(500);
+            if (frameNull != null) {
+                fail("Should not have received any messages");
+            }
+        } catch (SocketTimeoutException soe) {}
+
+        // verify message is gone off the dest and went to the DLQ
+        assertTrue(Wait.waitFor(() -> brokerService.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+        assertTrue(sentToDlq.get());
+    }
+
     @Test(timeout = 60000)
     public void testJMSXGroupIdCanBeSet() throws Exception {
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
index fe86019be9..b562fd7e59 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
@@ -16,12 +16,19 @@
  */
 package org.apache.activemq.command;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 
 import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.util.ByteSequenceData;
+import org.apache.activemq.util.ExceptionUtils;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 
 /**
  * 
@@ -509,4 +516,28 @@ public class ActiveMQBytesMessageTest extends TestCase {
         } catch (MessageNotReadableException e) {
         }
     }
+
+    public void testCompressedUnmarshalException() throws Exception {
+        ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(true);
+
+        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
+        msg.setConnection(connection);
+        msg.writeDouble(3.3d);
+
+        // store and reset for reading
+        msg.reset();
+        assertTrue(msg.isCompressed());
+
+        // corrupt the buffer
+        ByteSequenceData.writeIntBig(msg.content, 100000);
+
+        try {
+            msg.readDouble();
+            fail("Should have thrown exception");
+        } catch (JMSException e) {
+            // expected
+            assertTrue(ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
+        }
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
index 859a0d2455..46badfc293 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQMapMessageTest.java
@@ -477,7 +477,18 @@ public class ActiveMQMapMessageTest {
 
     @Test
     public void testUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    @Test
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    // For map messages both compressed and uncompressed need to be 
unmarshalled
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQMapMessage msg = new ActiveMQMapMessage();
         msg.setConnection(connection);
@@ -486,6 +497,7 @@ public class ActiveMQMapMessageTest {
         // store and marshal
         msg.storeContentAndClear();
         assertTrue(msg.map.isEmpty());
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000);
@@ -500,5 +512,4 @@ public class ActiveMQMapMessageTest {
                     ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
         }
     }
-
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
index 60e661a929..cbf9851b40 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQObjectMessageTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.command;
 
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
@@ -28,6 +29,7 @@ import javax.jms.MessageNotWriteableException;
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.util.ByteSequenceData;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
 import org.apache.commons.lang.exception.ExceptionUtils;
 
 /**
@@ -129,8 +131,17 @@ public class ActiveMQObjectMessageTest extends TestCase {
         }
     }
 
-    public void testUnCompressedException() throws Exception {
+    public void testUnCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
         msg.setConnection(connection);
@@ -139,6 +150,7 @@ public class ActiveMQObjectMessageTest extends TestCase {
         // store and marshal
         msg.storeContentAndClear();
         assertNull(msg.object);
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000);
@@ -150,6 +162,12 @@ public class ActiveMQObjectMessageTest extends TestCase {
         } catch (JMSException e) {
             // uncompressed will have an error from the JDK deserialization
             assertTrue(ExceptionUtils.getRootCause(e) instanceof IOException);
+
+            // our validation causes BufferUnmarshalException for a compressed 
stream
+            if (compressed) {
+                // expected
+                assertTrue(ExceptionUtils.getRootCause(e) instanceof 
ActiveMQUnmarshalEOFException);
+            }
         }
     }
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
index 5f1e3465a0..e0ab4ebed2 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
@@ -18,10 +18,12 @@ package org.apache.activemq.command;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
@@ -31,8 +33,7 @@ import javax.jms.MessageNotWriteableException;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.util.ByteSequenceData;
-import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.activemq.util.ExceptionUtils;
 import org.junit.Test;
 
 /**
@@ -1081,7 +1082,17 @@ public class ActiveMQStreamMessageTest {
 
     @Test
     public void testUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    @Test
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
         msg.setConnection(connection);
@@ -1090,6 +1101,7 @@ public class ActiveMQStreamMessageTest {
         // store and marshal
         msg.reset();
         assertNull(msg.dataOut);
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000000);
@@ -1098,8 +1110,8 @@ public class ActiveMQStreamMessageTest {
             msg.readBytes(new byte[1024]);
             fail("Should have thrown exception");
         } catch (JMSException e) {
-            // expected
-            assertTrue(e instanceof MessageFormatException);
+            // if this is not null then there was an expected format exception
+            assertNotNull(ExceptionUtils.createMessageFormatException(e));
         }
     }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
index 7999e8ee81..7afb7662ed 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.command;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.beans.Transient;
 import java.io.DataOutputStream;
@@ -162,8 +163,19 @@ public class ActiveMQTextMessageTest extends TestCase {
         assertTrue(method.isAnnotationPresent(Transient.class));
     }
 
-    public void testUnUnmarshalException() throws Exception {
+
+    public void testUnCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(false);
+    }
+
+    public void testCompressedUnmarshalException() throws Exception {
+        testUnmarshalException(true);
+    }
+
+    // For text messages both compressed and uncompressed need to be 
unmarshalled
+    private void testUnmarshalException(boolean compressed) throws Exception {
         ActiveMQConnection connection = mock(ActiveMQConnection.class);
+        when(connection.isUseCompression()).thenReturn(compressed);
 
         ActiveMQTextMessage msg = new ActiveMQTextMessage();
         msg.setConnection(connection);
@@ -172,6 +184,7 @@ public class ActiveMQTextMessageTest extends TestCase {
         // store and marshal
         msg.storeContentAndClear();
         assertNull(msg.text);
+        assertEquals(compressed, msg.isCompressed());
 
         // corrupt the buffer
         ByteSequenceData.writeIntBig(msg.content, 1000);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
index 4c77bfcdea..900e827915 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/MessageCompressionTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.command;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.UnsupportedEncodingException;
 
 import javax.jms.BytesMessage;
@@ -24,13 +30,27 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import junit.framework.TestCase;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
-
-public class MessageCompressionTest extends TestCase {
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import 
org.apache.activemq.util.MarshallingSupport.ActiveMQUnmarshalEOFException;
+import 
org.apache.activemq.util.MarshallingSupport.MaxInflatedDataSizeExceededException;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageCompressionTest {
 
     private static final String BROKER_URL = "tcp://localhost:0";
     // The following text should compress well
@@ -47,20 +67,58 @@ public class MessageCompressionTest extends TestCase {
     private BrokerService broker;
     private ActiveMQQueue queue;
     private String connectionUri;
+    private final AtomicBoolean throwMaxInflatedException = new 
AtomicBoolean(false);
+    private final AtomicBoolean sentToDlq = new AtomicBoolean(false);
+
+    @Before
+    public void setUp() throws Exception {
+        throwMaxInflatedException.set(false);
+        sentToDlq.set(false);
 
-    protected void setUp() throws Exception {
         broker = new BrokerService();
+        broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
+            @Override
+            public Broker installPlugin(Broker broker) {
+                return new BrokerFilter(broker) {
+                    @Override
+                    public void preProcessDispatch(MessageDispatch 
messageDispatch) {
+                        super.preProcessDispatch(messageDispatch);
+                        // simulate a max inflated data size exception during 
protocol
+                        // conversion and decompression
+                        if (throwMaxInflatedException.get()) {
+                            try {
+                                throw new 
MaxInflatedDataSizeExceededException("Test");
+                            } catch (ActiveMQUnmarshalEOFException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    }
+
+                    @Override
+                    public boolean sendToDeadLetterQueue(ConnectionContext 
context,
+                            MessageReference messageReference, Subscription 
subscription,
+                            Throwable poisonCause) {
+                        sentToDlq.set(true);
+                        return super.sendToDeadLetterQueue(context, 
messageReference,
+                                subscription, poisonCause);
+                    }
+                };
+            }
+        }});
+
         connectionUri = 
broker.addConnector(BROKER_URL).getPublishableConnectString();
         broker.start();
         queue = new ActiveMQQueue("TEST." + System.currentTimeMillis());
     }
 
-    protected void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         if (broker != null) {
             broker.stop();
         }
     }
 
+    @Test
     public void testTextMessageCompression() throws Exception {
 
         ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
@@ -79,6 +137,7 @@ public class MessageCompressionTest extends TestCase {
                 compressedSize < unCompressedSize);
     }
 
+    @Test
     public void testBytesMessageCompression() throws Exception {
 
         ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
@@ -86,9 +145,9 @@ public class MessageCompressionTest extends TestCase {
         sendTestBytesMessage(factory, TEXT);
         ActiveMQBytesMessage message = receiveTestBytesMessage(factory);
         int compressedSize = message.getContent().getLength();
-        byte[] bytes = new byte[TEXT.getBytes("UTF8").length];
+        byte[] bytes = new byte[TEXT.getBytes(StandardCharsets.UTF_8).length];
         message.readBytes(bytes);
-        assertTrue(message.readBytes(new byte[255]) == -1);
+        assertEquals(-1, message.readBytes(new byte[255]));
         String rcvString = new String(bytes, "UTF8");
         assertEquals(TEXT, rcvString);
         assertTrue(message.isCompressed());
@@ -100,7 +159,55 @@ public class MessageCompressionTest extends TestCase {
         int unCompressedSize = message.getContent().getLength();
 
         assertTrue("expected: compressed Size '" + compressedSize + "' < 
unCompressedSize '" + unCompressedSize + "'",
-                   compressedSize < unCompressedSize);
+                compressedSize < unCompressedSize);
+    }
+
+    // Test that an error during dispatch goes to the DLQ
+    @Test
+    public void testMaxInflatedSizeDlq() throws Exception {
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+        factory.setUseCompression(true);
+        ActiveMQConnection con1 = (ActiveMQConnection) 
factory.createConnection();
+        con1.start();
+
+        Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session1.createProducer(queue);
+        ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) 
session1.createBytesMessage();
+        bytesMessage.writeBytes(TEXT.getBytes(StandardCharsets.UTF_8));
+        producer.send(bytesMessage);
+
+        assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 1, 
1000, 10));
+        assertFalse(sentToDlq.get());
+
+        // simulate a decompression error
+        // this should poison ack and DLQ and we shouldn't get the message
+        // but the connection should still be open
+        this.throwMaxInflatedException.set(true);
+
+        ActiveMQConnection con2 = (ActiveMQConnection) 
factory.createConnection();
+        con2.start();
+        Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session2.createConsumer(queue);
+        assertNull(consumer.receive(1000));
+
+        // verify message is gone off the dest and went to the DLQ
+        assertTrue(Wait.waitFor(() -> broker.getDestination(queue)
+                .getDestinationStatistics().getMessages().getCount() == 0, 
500, 10));
+        assertTrue(sentToDlq.get());
+
+        // no longer throw an exception
+        this.throwMaxInflatedException.set(false);
+        sentToDlq.set(false);
+
+        // exception has been disabled so we should receive again on the same 
connection
+        producer.send(bytesMessage);
+        assertNotNull(consumer.receive(1000));
+        assertFalse(sentToDlq.get());
+
+        con1.close();
+        con2.close();
     }
 
     private void sendTestMessage(ActiveMQConnectionFactory factory, String 
message) throws JMSException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to