This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new e6fac68191 Provide a flexible filter style input stream that limits
read amounts (#2118) (#2121)
e6fac68191 is described below
commit e6fac68191c96f8252c625572e9bd05ec1432a79
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jun 15 16:20:56 2026 -0400
Provide a flexible filter style input stream that limits read amounts
(#2118) (#2121)
This commit supplies a utility type that acts as a filter input stream
style stream wrapper than can be configured with an available bytes
window which decreases as bytes are read from the stream. This allows
a transport or other utility to limit what can be read based on a fixed
amount such as a max frame size option.
(cherry picked from commit 81ab8875bef634ee33026ed180586d2d36040bfb)
Co-authored-by: Timothy Bish <[email protected]>
---
.../FrameSizeLimitedFilterInputStream.java | 247 +++++++++++
.../FrameSizeLimitedFilterInputStreamTest.java | 492 +++++++++++++++++++++
2 files changed, 739 insertions(+)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
new file mode 100644
index 0000000000..f96b12427d
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+/**
+ * A filtered style input stream that allows reads up to a given known max
frame size
+ * before it starts to throw exceptions indicating the reader has exceeded the
set
+ * limit. This can be used to wrap another stream that contains a protocol
frame to
+ * be parsed and enforce that decoding of that frame does not cross the
boundary set
+ * as the max available bytes before error.
+ * <p>
+ * This is a specialized stream type that may obfuscate the actual state of
the underlying
+ * stream such as its actual available bytes. The user should be aware of the
behavior of
+ * this stream when using it to ensure they do not run into unexpected
failures. It is possible
+ * to configure this stream with a higher available limit than the underlying
stream actually
+ * has access to but that inconsistency is left as a requirement for the
caller to handle.
+ */
+public class FrameSizeLimitedFilterInputStream extends InputStream {
+
+ private boolean canMark;
+
+ private int maxAvailableBytes;
+ private int availableBytes;
+
+ private int markLimit;
+ private int markRemaining;
+
+ private InputStream stream;
+
+ /**
+ * Create a new uninitialized instance of the filter stream that will fail
to
+ * read until a stream and a frame size limit is configured.
+ */
+ public FrameSizeLimitedFilterInputStream() {
+ this.maxAvailableBytes = 0;
+ this.availableBytes = maxAvailableBytes;
+ }
+
+ /**
+ * Create a new instance with the given amount of available bytes that
should be
+ * readable before an exception is thrown indicating that more bytes where
requested
+ * from the known fixed frame size than is allowed.
+ *
+ * @param available
+ * The number of available bytes to allow in a given frame.
+ * @param in
+ * The {@link InputStream} to read from (cannot be null).
+ */
+ public FrameSizeLimitedFilterInputStream(int available, InputStream in) {
+ if (available < 0) {
+ throw new IllegalArgumentException("Available bytes needs to be a
positive integer but was: " + available);
+ }
+
+ this.stream = Objects.requireNonNull(in);
+ this.canMark = in.markSupported();
+ this.maxAvailableBytes = available;
+ this.availableBytes = maxAvailableBytes;
+ }
+
+ /**
+ * Render the stream unusable until a reset is called that either changes
+ * the stream and assigns a new max or simply assigns a new max which
+ * assumes that the underlying stream remains readable which is only a
+ * subset of stream types such as byte array wrapper variants.
+ */
+ @Override
+ public void close() throws IOException {
+ maxAvailableBytes = availableBytes = markLimit = markRemaining = 0;
+ canMark = false;
+ if (stream != null) {
+ stream.close();
+ }
+ }
+
+ /**
+ * Resets the number of available bytes that can be read from the
underlying
+ * stream. The underlying stream may still throw exceptions if it cannot
provide
+ * this many bytes. As a result of calling this method any currently set
mark
+ * is cleared and the stream cannot be reset back to a previously available
+ * number of bytes from this point onward.
+ * <p>
+ * Calling this method on a stream wrapper that has not been initialized
will
+ * not result in a readable state, the limit remains zero.
+ */
+ public void resetAvailable() {
+ resetAvailable(maxAvailableBytes);
+ }
+
+ /**
+ * Resets the number of available bytes that can be read from the
underlying
+ * stream to the new amount. The underlying stream may still throw
exceptions
+ * if it cannot provide this many bytes. As a result of calling this method
+ * any currently set mark is cleared and the stream cannot be reset back
to a
+ * previously available number of bytes from this point onward.
+ *
+ * @param available
+ * The new available number of bytes to allow from this
stream wrapper
+ */
+ public void resetAvailable(int available) {
+ resetAvailable(stream, available);
+ }
+
+ /**
+ * Resets the number of available bytes and assigns a new stream that can
be read
+ * from the which allows this type to be re-usable across command reads.
The underlying
+ * stream may still throw exceptions if it cannot provide this many bytes.
As a result
+ * of calling this method any currently set mark is cleared and the stream
cannot be
+ * reset back to a previously available number of bytes from this point
onward.
+ *
+ * @param in
+ * The new input stream to read bytes from (cannot be
assigned as null).
+ * @param available
+ * The new available number of bytes to allow from this
stream wrapper
+ */
+ public void resetAvailable(InputStream in, int available) {
+ if (available < 0) {
+ throw new IllegalArgumentException("Available bytes needs to be a
positive integer but was: " + available);
+ }
+
+ availableBytes = maxAvailableBytes = available;
+ markLimit = markRemaining = 0;
+ stream = Objects.requireNonNull(in);
+ canMark = stream.markSupported();
+ }
+
+ @Override
+ public int read() throws IOException {
+ Objects.requireNonNull(stream, "The stream wrapper has not been bound
to a source input stream");
+
+ validateAvailable(1, availableBytes);
+
+ final int read = stream.read();
+
+ reduceAvailable(1);
+
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ Objects.requireNonNull(stream, "The stream wrapper has not been bound
to a source input stream");
+
+ // It is technically permissible for this method to read up to
available
+ // bytes if the length is greater than that but it is likely not going
to
+ // result in outcomes we can predict as easily so for now this is
limited
+ // and just throws for anything over available bytes. This could be
changed
+ // to call a read using Math.min(availableBytes, length) but what could
+ // happen is we get into a read loop where we endlessly return end of
stream
+ // which won't send the signal that a read past the max limit was
triggered.
+ validateAvailable(len, availableBytes);
+
+ return reduceAvailable(stream.read(b, off, len));
+ }
+
+ @Override
+ public long skip(long amount) throws IOException {
+ if (amount < 0) {
+ return 0;
+ }
+
+ Objects.requireNonNull(stream, "The stream wrapper has not been bound
to a source input stream");
+
+ final int safeSkipRange = (int) Math.min(Integer.MAX_VALUE, amount);
+
+ // Max frame size is limited to Integer.MAX_VALUE as we store that
value as an integer
+ // so don't accept more than that amount which is valid and does allow
the caller to
+ // skip that full massive frame but will fail on the next stream
operation.
+ validateAvailable(safeSkipRange, availableBytes);
+
+ return reduceAvailable((int) stream.skip(safeSkipRange));
+ }
+
+ @Override
+ public int available() throws IOException {
+ return availableBytes;
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ if (canMark && readLimit > 0) {
+ markLimit = markRemaining = readLimit;
+ stream.mark(readLimit);
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (canMark && markLimit > 0) {
+ availableBytes += markLimit - markRemaining;
+ markRemaining = markLimit = 0;
+ stream.reset();
+ }
+ }
+
+ @Override
+ public boolean markSupported() {
+ return canMark;
+ }
+
+ private static void validateAvailable(int requested, int available) throws
IOException {
+ if (requested > available) {
+ throw new IOException(String.format(
+ "Cannot read more than the max available %d bytes: requested
%d", available, requested));
+ }
+ }
+
+ private int reduceAvailable(int amount) throws IOException {
+ try {
+ availableBytes = Math.subtractExact(availableBytes, amount);
+ } catch (ArithmeticException e) {
+ throw new IOException(e);
+ }
+
+ if (markLimit > 0) {
+ markRemaining = markRemaining - amount;
+ if (markRemaining < 0) {
+ markLimit = markRemaining = 0;
+ }
+ }
+
+ return amount;
+ }
+}
diff --git
a/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
new file mode 100644
index 0000000000..9be7eb50c6
--- /dev/null
+++
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.junit.Test;
+
+/**
+ * Tests for FrameSizeLimitedFilterInputStream
+ */
+public class FrameSizeLimitedFilterInputStreamTest {
+
+ private static final int DEFAULT_TEST_PAYLOAD_SIZE = 256;
+
+ private byte[] createPayload() {
+ final byte[] data = new byte[DEFAULT_TEST_PAYLOAD_SIZE];
+
+ for (int i = 0; i < DEFAULT_TEST_PAYLOAD_SIZE; ++i) {
+ data[i] = (byte) i;
+ }
+
+ return data;
+ }
+
+ @Test
+ public void testCreate() throws IOException {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(new
byte[2048]);
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1024, bais)) {
+ assertTrue(stream.markSupported());
+ assertEquals(1024, stream.available());
+ }
+ }
+
+ @Test
+ public void testCreateChecks() throws IOException {
+ assertThrows(NullPointerException.class, () -> new
FrameSizeLimitedFilterInputStream(1024, null));
+ assertThrows(IllegalArgumentException.class, () -> new
FrameSizeLimitedFilterInputStream(-1, new ByteArrayInputStream(new byte[0])));
+ }
+
+ @Test
+ public void testCreateUnbound() throws IOException {
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream()) {
+ assertThrows(NullPointerException.class, () -> stream.read());
+ assertThrows(NullPointerException.class, () -> stream.skip(1));
+ assertThrows(NullPointerException.class, () -> stream.read(new
byte[0]));
+ assertThrows(NullPointerException.class, () -> stream.read(new
byte[1], 0, 1));
+ }
+ }
+
+ @Test
+ public void testUnusableAfterClose() throws IOException {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(new
byte[2048]);
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1024, bais)) {
+ stream.close();
+
+ assertThrows(IOException.class, () -> stream.read());
+ assertThrows(IOException.class, () -> stream.skip(1));
+ assertThrows(IOException.class, () -> stream.read(new byte[1]));
+ assertThrows(IOException.class, () -> stream.read(new byte[1], 0,
1));
+
+ assertFalse(stream.markSupported());
+
+ // Should no-op and not throw
+ stream.mark(1);
+ stream.reset();
+ }
+ }
+
+ @Test
+ public void testReadByte() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+ for (int i = 0; i < Byte.MAX_VALUE; ++i) {
+ assertEquals(i, stream.read());
+ }
+
+ assertThrows(IOException.class, () -> stream.read());
+ }
+ }
+
+ @Test
+ public void testReadFailsAfterClosed() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+ stream.close();
+
+ assertThrows(IOException.class, () -> stream.read());
+ assertThrows(IOException.class, () -> stream.read(new byte[10]));
+ assertThrows(IOException.class, () -> stream.read(new byte[10], 0,
10));
+ }
+ }
+
+ @Test
+ public void testReadByteAndResetAvailable() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1, bais)) {
+ assertEquals(0, stream.read());
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable();
+
+ assertEquals(1, stream.read());
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable(2);
+
+ assertEquals(2, stream.read());
+ assertEquals(3, stream.read());
+ assertThrows(IOException.class, () -> stream.read());
+ }
+ }
+
+ @Test
+ public void testReadBytes() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+ final byte[] sink = new byte[Byte.MAX_VALUE];
+
+ assertEquals(Byte.MAX_VALUE, stream.read(sink));
+
+ for (int i = 0; i < Byte.MAX_VALUE; ++i) {
+ assertEquals(i, sink[i]);
+ }
+
+ assertThrows(IOException.class, () -> stream.read());
+ }
+ }
+
+ @Test
+ public void testReadBytesAndResetAvailable() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+ final byte[] sink = new byte[1];
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1, bais)) {
+ assertEquals(1, stream.read(sink));
+ assertEquals(0, sink[0]);
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable();
+
+ assertEquals(1, stream.read(sink));
+ assertEquals(1, sink[0]);
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable(2);
+
+ assertEquals(1, stream.read(sink));
+ assertEquals(2, sink[0]);
+ assertEquals(1, stream.read(sink));
+ assertEquals(3, sink[0]);
+ assertThrows(IOException.class, () -> stream.read());
+ }
+ }
+
+ @Test
+ public void testReadBytesIndexed() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(Byte.MAX_VALUE, bais)) {
+ final byte[] sink = new byte[Byte.MAX_VALUE];
+
+ assertEquals(Byte.MAX_VALUE, stream.read(sink, 0, sink.length));
+
+ for (int i = 0; i < Byte.MAX_VALUE; ++i) {
+ assertEquals(i, sink[i]);
+ }
+
+ assertThrows(IOException.class, () -> stream.read(sink, 0,
sink.length));
+ }
+ }
+
+ @Test
+ public void testReadBytesIndexedAndResetAvailable() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+ final byte[] sink = new byte[1];
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1, bais)) {
+ assertEquals(1, stream.read(sink, 0, sink.length));
+ assertEquals(0, sink[0]);
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable();
+
+ assertEquals(1, stream.read(sink, 0, sink.length));
+ assertEquals(1, sink[0]);
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable(2);
+
+ assertEquals(1, stream.read(sink, 0, sink.length));
+ assertEquals(2, sink[0]);
+ assertEquals(1, stream.read(sink, 0, sink.length));
+ assertEquals(3, sink[0]);
+ assertThrows(IOException.class, () -> stream.read());
+ }
+ }
+
+ @Test
+ public void testSkipBytes() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1, bais)) {
+ assertEquals(1, stream.skip(1));
+ assertThrows(IOException.class, () -> stream.skip(1));
+
+ stream.resetAvailable();
+
+ assertEquals(1, stream.skip(1));
+ assertThrows(IOException.class, () -> stream.skip(10));
+
+ stream.resetAvailable(2);
+
+ assertEquals(2, stream.skip(2));
+ assertThrows(IOException.class, () -> stream.skip(100));
+
+ stream.resetAvailable();
+
+ assertEquals(4, stream.read());
+ assertEquals(5, stream.read());
+
+ assertThrows(IOException.class, () -> stream.skip(1));
+ }
+ }
+
+ @Test
+ public void testSkipNegativeThrows() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1, bais)) {
+ assertEquals(1, stream.available());
+ assertEquals(0, stream.skip(-1));
+ assertEquals(1, stream.available());
+ }
+ }
+
+ @Test
+ public void testSkipMassiveSize() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload()) {
+
+ @Override
+ public long skip(long amount) {
+ return amount;
+ }
+ };
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(Integer.MAX_VALUE, bais)) {
+ assertEquals(Integer.MAX_VALUE, stream.skip(Long.MAX_VALUE));
+
+ assertThrows(IOException.class, () -> stream.skip(1));
+ assertThrows(IOException.class, () -> stream.read());
+ assertThrows(IOException.class, () -> stream.read(new byte[1]));
+ assertThrows(IOException.class, () -> stream.read(new byte[1], 0,
1));
+ }
+ }
+
+ @Test
+ public void testSkipBytesPastConfiguredAvailable() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(1, bais)) {
+ assertThrows(IOException.class, () -> stream.skip(2));
+
+ assertEquals(1, stream.skip(1));
+
+ assertThrows(IOException.class, () -> stream.skip(1));
+ }
+ }
+
+ @Test
+ public void testBasicMark() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ assertTrue(bais.markSupported());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(10, bais)) {
+ assertTrue(stream.markSupported());
+
+ stream.mark(1);
+ assertEquals(0, stream.read());
+ assertEquals(9, stream.available());
+
+ stream.reset();
+ assertEquals(10, stream.available());
+ assertEquals(0, stream.read());
+ assertEquals(9, stream.available());
+
+ stream.reset(); // Mark wasn't called
+ assertEquals(9, stream.available());
+ }
+ }
+
+ @Test
+ public void testMarkZeroIsIgnored() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ assertTrue(bais.markSupported());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(10, bais)) {
+ assertTrue(stream.markSupported());
+
+ stream.mark(0);
+ assertEquals(0, stream.read());
+ assertEquals(9, stream.available());
+
+ stream.reset();
+ assertEquals(9, stream.available());
+ assertEquals(1, stream.read());
+ assertEquals(8, stream.available());
+
+ stream.reset(); // Mark wasn't called
+ assertEquals(8, stream.available());
+ }
+ }
+
+ @Test
+ public void testLastMarkWins() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ assertTrue(bais.markSupported());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(10, bais)) {
+ assertTrue(stream.markSupported());
+
+ stream.mark(1);
+ stream.mark(2);
+ stream.mark(3);
+ stream.mark(4);
+
+ final byte[] read1 = new byte[4];
+ final byte[] read2 = new byte[4];
+
+ stream.read(read1);
+ assertEquals(6, stream.available());
+ stream.reset();
+ assertEquals(10, stream.available());
+ stream.read(read2);
+
+ assertArrayEquals(read1, read2);
+ }
+ }
+ @Test
+ public void testReadPastMarkClearsMark() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ assertTrue(bais.markSupported());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(10, bais)) {
+ assertTrue(stream.markSupported());
+
+ stream.mark(3);
+
+ assertEquals(3, stream.read(new byte[3]));
+ assertEquals(3, stream.read()); // Past mark.
+ assertEquals(6, stream.available());
+
+ stream.reset(); // Should have no affect
+
+ assertEquals(6, stream.available());
+ assertEquals(4, stream.read());
+
+ assertThrows(IOException.class, () -> stream.read(new byte[10], 0,
10));
+
+ assertEquals(5, stream.available());
+ assertEquals(5, stream.read());
+ }
+ }
+
+ @Test
+ public void testMarkNotSupported() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload()) {
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+ };
+
+ assertFalse(bais.markSupported());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(10, bais)) {
+ assertFalse(stream.markSupported());
+
+ stream.mark(1);
+ assertEquals(0, stream.read());
+ assertEquals(9, stream.available());
+
+ stream.reset();
+ assertEquals(9, stream.available());
+ assertEquals(1, stream.read());
+ assertEquals(8, stream.available());
+
+ stream.reset();
+ assertEquals(8, stream.available());
+
+ final byte[] first = new byte[8];
+
+ stream.mark(8);
+ stream.read(first);
+ assertEquals(0, stream.available());
+ stream.reset();
+ assertEquals(0, stream.available());
+
+ stream.resetAvailable(10);
+
+ final byte[] second = new byte[10];
+
+ stream.mark(10);
+ stream.read(second);
+ assertEquals(0, stream.available());
+ stream.reset();
+ assertEquals(0, stream.available());
+ }
+ }
+
+ @Test
+ public void testResetToNewStream() throws IOException {
+ final ByteArrayInputStream bais = new ByteArrayInputStream(new
byte[2048]);
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(2048, bais)) {
+ assertTrue(stream.markSupported());
+ assertEquals(2048, stream.available());
+
+ final ByteArrayInputStream nextStream = new
ByteArrayInputStream(new byte[1024]) {
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+ };
+
+ assertFalse(nextStream.markSupported());
+
+ stream.resetAvailable(nextStream, 1024);
+
+ assertFalse(stream.markSupported());
+ assertEquals(1024, stream.available());
+ }
+ }
+
+ @Test
+ public void testResetWithinSameStream() throws IOException {
+ final ByteArrayInputStream bais = new
ByteArrayInputStream(createPayload());
+
+ try (FrameSizeLimitedFilterInputStream stream = new
FrameSizeLimitedFilterInputStream(DEFAULT_TEST_PAYLOAD_SIZE, bais)) {
+ assertTrue(stream.markSupported());
+ assertEquals(DEFAULT_TEST_PAYLOAD_SIZE, stream.available());
+
+ assertEquals(0, stream.read());
+
+ assertThrows(IllegalArgumentException.class, () ->
stream.resetAvailable(-1));
+ stream.resetAvailable(1);
+
+ assertEquals(1, stream.read());
+ assertThrows(IOException.class, () -> stream.read());
+
+ stream.resetAvailable(2);
+
+ assertEquals(2, stream.read());
+ assertEquals(3, stream.read());
+ assertThrows(IOException.class, () -> stream.read());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact