This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new 38ac99fba7 Improve handling of partial reads and EOF in Frame size 
limited filter (#2154) (#2156)
38ac99fba7 is described below

commit 38ac99fba792af0a09950d33f634465613538e3e
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Tue Jun 23 20:00:38 2026 -0400

    Improve handling of partial reads and EOF in Frame size limited filter 
(#2154) (#2156)
    
    This update makes the following changes/improvements:
    
    * If the wrapped stream is finished and returns -1 this is now returned
      correctly and availableBytes are not decremented.
    * Partial reads now will be attempted up to the availableBytes limit
    * If 0 is passed in for the length for a partial read then no attempt to
      read is done and 0 is returned, in line with the spec.
    
    (cherry picked from commit 5d9c3036b662995642b47a6ecdb4e5eb652621c9)
---
 .../FrameSizeLimitedFilterInputStream.java         | 34 +++++++++++++++-------
 .../FrameSizeLimitedFilterInputStreamTest.java     | 33 +++++++++++++++++++--
 2 files changed, 54 insertions(+), 13 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
index 419d31561a..09818e19b5 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStream.java
@@ -151,7 +151,8 @@ public class FrameSizeLimitedFilterInputStream extends 
InputStream {
 
         final int read = stream.read();
 
-        reduceAvailable(1);
+        // if -1 then the stream is done
+        reduceAvailable(read >= 0 ? 1 : -1);
 
         return read;
     }
@@ -165,16 +166,25 @@ public class FrameSizeLimitedFilterInputStream extends 
InputStream {
     public int read(byte[] b, int off, int len) throws IOException {
         Objects.requireNonNull(stream, "The stream wrapper has not been bound 
to a source input stream");
 
-        // It is technically permissible for this method to read up to 
available
-        // bytes if the length is greater than that but it is likely not going 
to
-        // result in outcomes we can predict as easily so for now this is 
limited
-        // and just throws for anything over available bytes. This could be 
changed
-        // to call a read using Math.min(availableBytes, length) but what could
-        // happen is we get into a read loop where we endlessly return end of 
stream
-        // which won't send the signal that a read past the max limit was 
triggered.
-        validateAvailable(len, availableBytes);
+        // If length is 0, this method is supposed to just return 0 with
+        // no bytes being read
+        if (len == 0) {
+            return 0;
+        }
+
+        final int toRead;
+        if (availableBytes > 0) {
+            // read the smaller of availableBytes or the length
+            // this method allows partial reads less than len
+            toRead = Math.min(len, availableBytes);
+        } else {
+            // we have no more remaining but there is data left so trigger 
error
+            toRead = 1;
+        }
 
-        return reduceAvailable(stream.read(b, off, len));
+        validateAvailable(toRead, availableBytes);
+
+        return reduceAvailable(stream.read(b, off, toRead));
     }
 
     @Override
@@ -230,6 +240,10 @@ public class FrameSizeLimitedFilterInputStream extends 
InputStream {
     }
 
     private int reduceAvailable(int amount) throws IOException {
+        if (amount == -1) {
+            return -1; // Underlying says there are no more bytes
+        }
+
         try {
             availableBytes = Math.subtractExact(availableBytes, amount);
         } catch (ArithmeticException e) {
diff --git 
a/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
 
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
index 9be7eb50c6..b130c87468 100644
--- 
a/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
+++ 
b/activemq-client/src/test/java/org/apache/activemq/transport/FrameSizeLimitedFilterInputStreamTest.java
@@ -152,6 +152,27 @@ public class FrameSizeLimitedFilterInputStreamTest {
             }
 
             assertThrows(IOException.class, () -> stream.read());
+            // docs say len of 0 just returns 0 and no attempt to read is made
+            assertEquals(0, stream.read(new byte[10], 0, 0));
+        }
+    }
+
+    @Test
+    public void testReadBytesStreamLessThanLimit() throws IOException {
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(createPayload());
+
+        try (FrameSizeLimitedFilterInputStream stream = new 
FrameSizeLimitedFilterInputStream(Integer.MAX_VALUE, bais)) {
+            for (int i = 0; i < DEFAULT_TEST_PAYLOAD_SIZE; ++i) {
+                assertEquals(i, stream.read());
+            }
+
+            // Stream should return -1 because we are finished but less than 
the
+            // limit of the wrapper
+            assertEquals(-1, stream.read());
+            assertEquals(-1, stream.read(new byte[10]));
+            assertEquals(-1, stream.read(new byte[10], 0, 10));
+            // docs say len of 0 just returns 0
+            assertEquals(0, stream.read(new byte[10], 0, 0));
         }
     }
 
@@ -387,10 +408,16 @@ public class FrameSizeLimitedFilterInputStreamTest {
             assertEquals(6, stream.available());
             assertEquals(4, stream.read());
 
-            assertThrows(IOException.class, () -> stream.read(new byte[10], 0, 
10));
+            // partial read should work
+            byte[] data = new byte[10];
+            assertEquals(5, stream.read(data, 0, 10));
+            for (int i = 0; i < 5; i++) {
+                assertEquals(i + 5, data[i]);
+            }
 
-            assertEquals(5, stream.available());
-            assertEquals(5, stream.read());
+            assertEquals(0, stream.available());
+            // availableBytes has been exhausted so this will error
+            assertThrows(IOException.class, stream::read);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to