Copilot commented on code in PR #16041:
URL: https://github.com/apache/dubbo/pull/16041#discussion_r2720073775


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java:
##########
@@ -125,6 +125,7 @@ public static MethodDescriptor findTripleMethodDescriptor(
             ServiceDescriptor serviceDescriptor, String methodName, 
InputStream rawMessage) throws IOException {
         MethodDescriptor methodDescriptor = 
findReflectionMethodDescriptor(serviceDescriptor, methodName);
         if (methodDescriptor == null) {

Review Comment:
   The call to `rawMessage.mark(Integer.MAX_VALUE)` assumes the InputStream 
supports mark/reset and has a sufficient buffer. While BoundedInputStream (used 
in the unified deframer) extends BufferedInputStream and supports this, 
consider adding a check or documentation to ensure all callers pass 
InputStreams that support mark/reset with adequate buffer size. If an 
InputStream that doesn't support mark/reset is passed, this will fail silently 
and the subsequent `reset()` on line 144 will throw an IOException.
   ```suggestion
           if (methodDescriptor == null) {
               if (!rawMessage.markSupported()) {
                   throw new IOException("InputStream does not support 
mark/reset, which is required to resolve overloaded triple methods.");
               }
   ```



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (remaining <= 0) {
+                return -1;
+            }
+            int result = super.read();
+            if (result != -1) {
+                remaining--;
+            }
+            return result;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {

Review Comment:
   Method 'read' overrides a synchronized method in 
[java.io.BufferedInputStream](1) but is not synchronized.



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }
+
+        @Override
+        public int read() throws IOException {
+            if (remaining <= 0) {
+                return -1;
+            }
+            int result = super.read();
+            if (result != -1) {
+                remaining--;
+            }
+            return result;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            if (remaining <= 0) {
+                return -1;
+            }
+            int toRead = Math.min(len, remaining);
+            int result = super.read(b, off, toRead);
+            if (result > 0) {
+                remaining -= result;
+            }
+            return result;
+        }
+
+        @Override
+        public int available() throws IOException {

Review Comment:
   Method 'available' overrides a synchronized method in 
[java.io.BufferedInputStream](1) but is not synchronized.
   ```suggestion
           public synchronized int available() throws IOException {
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -583,7 +589,9 @@ private void doOnData(ByteBuf data, boolean endStream) {
                 
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not 
received before payload"));

Review Comment:
   When headers are not received and the method returns early, the ByteBuf 
'data' is not released, causing a potential memory leak. Since line 594 creates 
a ByteBufInputStream with auto-release (second parameter 'true'), but this line 
is never reached when returning early at line 590, the ByteBuf will not be 
released. Add ReferenceCountUtil.release(data) before returning at line 590, 
similar to the error handling at line 582.
   ```suggestion
                   
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription("headers not 
received before payload"));
                   ReferenceCountUtil.release(data);
   ```



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, 
int length) throws IOEx
         return data;
     }
 
+    protected static class MessageStream {
+
+        public final InputStream inputStream;
+        public final int length;
+
+        public MessageStream(InputStream inputStream, int length) {
+            this.inputStream = inputStream;
+            this.length = length;
+        }
+    }
+
+    /**
+     * A bounded InputStream that reads at most 'limit' bytes from the source 
stream.
+     * Extends BufferedInputStream to support mark/reset, which is required by
+     * deserializers like Hessian2.
+     */
+    private static class BoundedInputStream extends BufferedInputStream {
+
+        private final int limit;
+        private int remaining;
+        private int markedRemaining;
+
+        public BoundedInputStream(InputStream source, int limit) {
+            super(source, limit);
+            this.limit = limit;
+            this.remaining = limit;
+            this.markedRemaining = limit;
+        }
+
+        @Override
+        public int read() throws IOException {

Review Comment:
   Method 'read' overrides a synchronized method in 
[java.io.BufferedInputStream](1) but is not synchronized.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcStreamingDecoder.java:
##########
@@ -50,12 +50,13 @@ protected void processOffset(InputStream inputStream, int 
lengthFieldOffset) thr
     }
 
     @Override
-    protected byte[] readRawMessage(InputStream inputStream, int length) 
throws IOException {
-        byte[] rawMessage = super.readRawMessage(inputStream, length);
-        return compressedFlag ? deCompressedMessage(rawMessage) : rawMessage;
-    }
-
-    private byte[] deCompressedMessage(byte[] rawMessage) {
-        return deCompressor.decompress(rawMessage);
+    protected MessageStream readMessageStream(InputStream inputStream, int 
length) throws IOException {
+        if (compressedFlag) {
+            // For compressed messages, we need to read bytes first, then 
decompress
+            byte[] rawMessage = readRawMessage(inputStream, length);
+            byte[] decompressed = deCompressor.decompress(rawMessage);
+            return new MessageStream(new 
java.io.ByteArrayInputStream(decompressed), decompressed.length);
+        }
+        return super.readMessageStream(inputStream, length);
     }
 }

Review Comment:
   The refactoring to use GrpcStreamingDecoder (unified deframer) removes the 
Triple-specific TriDecoder tests without adding equivalent test coverage. While 
GrpcStreamingDecoder is tested elsewhere, consider adding tests that verify the 
Triple protocol's specific usage patterns, including: compressed/uncompressed 
message handling, proper flow control through bytesRead callbacks, and correct 
integration with the DeCompressor component.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to