This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new cb03c480d3 Add better frame size validation for AMQP (#2167) (#2172)
cb03c480d3 is described below
commit cb03c480d3829f05ffcfe9b4bd7ef2d196ee438d
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Thu Jul 2 08:40:20 2026 -0400
Add better frame size validation for AMQP (#2167) (#2172)
Add a negative size check for AMQP NIO transports.
This also adds a similar check to OpenWire. For OpenWire
the same IllegalArgumentException is thrown but this
improves the error message in the NIO transport.
(cherry picked from commit 6e9dd5b0aee4cef5640fe4fde5d7736ab5b8464e)
---
.../activemq/transport/amqp/AmqpFrameParser.java | 5 +--
.../activemq/transport/amqp/AmqpWireFormat.java | 17 +++++---
.../amqp/protocol/AmqpFrameParserTest.java | 46 ++++++++++++++++++++++
.../amqp/protocol/AmqpWireFormatTest.java | 45 +++++++++++++++++++++
.../apache/activemq/openwire/OpenWireFormat.java | 4 +-
.../activemq/transport/nio/NIOSSLTransport.java | 6 +++
.../activemq/transport/nio/NIOTransport.java | 6 +++
7 files changed, 119 insertions(+), 10 deletions(-)
diff --git
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpFrameParser.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpFrameParser.java
index 06bc97b219..69f4918397 100644
---
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpFrameParser.java
+++
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpFrameParser.java
@@ -83,10 +83,7 @@ public class AmqpFrameParser {
if (wireFormat != null) {
maxFrameSize = wireFormat.getMaxFrameSize();
}
-
- if (frameSize > maxFrameSize) {
- throw IOExceptionSupport.createFrameSizeException(frameSize,
maxFrameSize);
- }
+ AmqpWireFormat.validateFrameSize(frameSize, maxFrameSize);
}
public void setWireFormat(AmqpWireFormat wireFormat) {
diff --git
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 033eae7c0b..a3b72f1e0b 100644
---
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -30,6 +30,7 @@ import
org.apache.activemq.transport.amqp.message.InboundTransformer;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
@@ -109,11 +110,7 @@ public class AmqpWireFormat implements WireFormat {
return new AmqpHeader(magic, false);
} else {
int size = dataIn.readInt();
- if (size > maxFrameSize) {
- throw new AmqpProtocolException("Frame size exceeded max frame
length.");
- } else if (size <= 0) {
- throw new AmqpProtocolException("Frame size value was invalid:
" + size);
- }
+ validateFrameSize(size, maxFrameSize);
Buffer frame = new Buffer(size);
frame.bigEndianEditor().writeInt(size);
frame.readFrom(dataIn);
@@ -261,4 +258,14 @@ public class AmqpWireFormat implements WireFormat {
public void setIdleTimeout(int idelTimeout) {
this.idelTimeout = idelTimeout;
}
+
+ static void validateFrameSize(int frameSize, long maxFrameSize) throws
IOException {
+ if (frameSize < 0) {
+ throw new AmqpProtocolException("Frame size of " + frameSize + "
exceeds the maximum frame configured or supported frame size limit");
+ } else if (Integer.toUnsignedLong(frameSize) > maxFrameSize) {
+ throw IOExceptionSupport.createFrameSizeException(frameSize,
maxFrameSize);
+ } else if (Integer.compareUnsigned(frameSize, 8) < 0) {
+ throw new AmqpProtocolException("Frame size of " + frameSize + "
is smaller than the minimally viable frame size value");
+ }
+ }
}
diff --git
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpFrameParserTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpFrameParserTest.java
index 5f78aaaee3..f3cfc6874d 100644
---
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpFrameParserTest.java
+++
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpFrameParserTest.java
@@ -18,9 +18,11 @@ package org.apache.activemq.transport.amqp.protocol;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -57,6 +59,7 @@ public class AmqpFrameParserTest {
frames.add(frame);
}
});
+ amqpWireFormat.setMaxFrameSize(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
codec.setWireFormat(amqpWireFormat);
}
@@ -345,6 +348,49 @@ public class AmqpFrameParserTest {
assertEquals(2, frames.size());
}
+ @Test
+ public void testNegativeFrameSize() throws Exception {
+ AmqpHeader inputHeader = new AmqpHeader();
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ output.write(inputHeader.getBuffer());
+ // the value is unsigned so negative means too large
+ output.writeInt(-100);
+ output.close();
+
+ IOException e = assertThrows(IOException.class, () ->
codec.parse(output.toBuffer().toByteBuffer()));
+ assertTrue(e.getMessage().contains("exceeds the maximum frame
configured or supported frame size limit"));
+ }
+
+ @Test
+ public void testFrameSizeTooSmall() throws Exception {
+ AmqpHeader inputHeader = new AmqpHeader();
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ output.write(inputHeader.getBuffer());
+ // less than 8 is too small
+ output.writeInt(3);
+ output.close();
+
+ IOException e = assertThrows(IOException.class, () ->
codec.parse(output.toBuffer().toByteBuffer()));
+ assertTrue(e.getMessage().contains("is smaller than the minimally
viable frame size value"));
+ }
+
+ @Test
+ public void testFrameSizeTooLarge() throws Exception {
+ amqpWireFormat.setMaxFrameSize(100);
+ AmqpHeader inputHeader = new AmqpHeader();
+
+ DataByteArrayOutputStream output = new DataByteArrayOutputStream();
+ output.write(inputHeader.getBuffer());
+ // less than 300 is larger than 100 maxFrameSize
+ output.writeInt(300);
+ output.close();
+
+ IOException e = assertThrows(IOException.class, () ->
codec.parse(output.toBuffer().toByteBuffer()));
+ assertTrue(e.getMessage().contains("larger than max allowed"));
+ }
+
private void assertHeadersEqual(AmqpHeader expected, AmqpHeader actual) {
assertTrue(expected.getBuffer().equals(actual.getBuffer()));
}
diff --git
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpWireFormatTest.java
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpWireFormatTest.java
index 7e320d24d3..1cc1530aaf 100644
---
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpWireFormatTest.java
+++
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/AmqpWireFormatTest.java
@@ -17,19 +17,29 @@
package org.apache.activemq.transport.amqp.protocol;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.AmqpHeader;
import org.apache.activemq.transport.amqp.AmqpWireFormat;
import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Before;
import org.junit.Test;
public class AmqpWireFormatTest {
private final AmqpWireFormat wireFormat = new AmqpWireFormat();
+ @Before
+ public void setUp() throws Exception {
+ wireFormat.setMaxFrameSize(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
+ }
+
@Test
public void testWhenSaslNotAllowedNonSaslHeaderIsInvliad() {
wireFormat.setAllowNonSaslConnections(false);
@@ -78,4 +88,39 @@ public class AmqpWireFormatTest {
wireFormat.resetMagicRead();
assertTrue(reset.get());
}
+
+
+ @Test
+ public void testNegativeFrameSize() throws Exception {
+ AmqpHeader inputHeader = new AmqpHeader();
+ wireFormat.unmarshal(new
ByteSequence(inputHeader.getBuffer().toByteArray()));
+
+ ByteSequence bs = new
ByteSequence(ByteBuffer.allocate(Integer.BYTES).putInt(-100).array());
+ IOException e = assertThrows(IOException.class, () ->
wireFormat.unmarshal(bs));
+ assertTrue(e.getMessage().contains("exceeds the maximum frame
configured or supported frame size limit"));
+ }
+
+ @Test
+ public void testFrameSizeTooSmall() throws Exception {
+ AmqpHeader inputHeader = new AmqpHeader();
+ wireFormat.unmarshal(new
ByteSequence(inputHeader.getBuffer().toByteArray()));
+
+ // less than 8
+ ByteSequence bs = new
ByteSequence(ByteBuffer.allocate(Integer.BYTES).putInt(3).array());
+ IOException e = assertThrows(IOException.class, () ->
wireFormat.unmarshal(bs));
+ assertTrue(e.getMessage().contains("is smaller than the minimally
viable frame size value"));
+ }
+
+ @Test
+ public void testFrameSizeTooLarge() throws Exception {
+ wireFormat.setMaxFrameSize(100);
+ AmqpHeader inputHeader = new AmqpHeader();
+ wireFormat.unmarshal(new
ByteSequence(inputHeader.getBuffer().toByteArray()));
+
+ // size 300 is larger than maxFrameSize
+ ByteSequence bs = new
ByteSequence(ByteBuffer.allocate(Integer.BYTES).putInt(300).array());
+ IOException e = assertThrows(IOException.class, () ->
wireFormat.unmarshal(bs));
+ assertTrue(e.getMessage().contains("larger than max allowed"));
+ }
+
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
index 62373111b8..6459453342 100644
---
a/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
+++
b/activemq-client/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
@@ -211,6 +211,7 @@ public final class OpenWireFormat implements WireFormat {
if (maxFrameSizeEnabled && size > maxFrameSize) {
throw IOExceptionSupport.createFrameSizeException(size,
maxFrameSize);
}
+ // This will also verify size is not negative
context.setFrameSize(size);
}
return doUnmarshal(bytesIn);
@@ -286,11 +287,12 @@ public final class OpenWireFormat implements WireFormat {
final var context = new MarshallingContext();
marshallingContext.set(context);
- if (!sizePrefixDisabled) {
+ if (!sizePrefixDisabled) {
int size = dis.readInt();
if (maxFrameSizeEnabled && size > maxFrameSize) {
throw IOExceptionSupport.createFrameSizeException(size,
maxFrameSize);
}
+ // This will also verify size is not negative
context.setFrameSize(size);
}
return doUnmarshal(dis);
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 606503288c..384f3f2af0 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -374,6 +374,12 @@ public class NIOSSLTransport extends NIOTransport {
}
}
+ // This isn't strictly necessary as ByteBuffer.allocate() would
also throw the same
+ // IllegalArgumentException but this provides a better error.
+ if (nextFrameSize < 0) {
+ throw new IllegalArgumentException("Frame size value " +
nextFrameSize + " may not be negative.");
+ }
+
// now we got the data, lets reallocate and store the size for the
marshaler.
// if there's more data in plain, then the next call will start
processing it.
currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 10aae3dd4b..d0126ff8d4 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -148,6 +148,12 @@ public class NIOTransport extends TcpTransport {
}
}
+ // This isn't strictly necessary as
ByteBuffer.allocateDirect() would also throw the same
+ // IllegalArgumentException, but this provides a better
error. OpenWire
+ if (nextFrameSize < 0) {
+ throw new IllegalArgumentException("Frame size value "
+ nextFrameSize + " may not be negative.");
+ }
+
if (nextFrameSize > inputBuffer.capacity()) {
currentBuffer =
ByteBuffer.allocateDirect(nextFrameSize);
currentBuffer.putInt(nextFrameSize);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact