Copilot commented on code in PR #16072:
URL: https://github.com/apache/dubbo/pull/16072#discussion_r2768045297
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Snappy.java:
##########
@@ -49,7 +50,8 @@ public byte[] compress(byte[] payloadByteArr) throws
RpcException {
@Override
public OutputStream decorate(OutputStream outputStream) {
- return outputStream;
+ // Snappy wraps output with SnappyOutputStream for streaming
compression
+ return new org.xerial.snappy.SnappyOutputStream(outputStream);
Review Comment:
The SnappyOutputStream constructor can throw IOException, but this is not
being caught. While Gzip and Bzip2 properly wrap IOExceptions from their stream
constructors in IllegalStateException (see lines 70-74 in Gzip.java and lines
76-80 in Bzip2.java), the Snappy implementation doesn't follow this pattern.
This will cause a compilation error. Wrap the construction in a try-catch block
and throw IllegalStateException, consistent with the other compressor
implementations.
```suggestion
try {
return new org.xerial.snappy.SnappyOutputStream(outputStream);
} catch (IOException e) {
throw new IllegalStateException(e);
}
```
##########
dubbo-common/src/main/java/org/apache/dubbo/common/io/UnsafeByteArrayInputStream.java:
##########
@@ -114,4 +115,15 @@ public void position(int newPosition) {
public int size() {
return mData == null ? 0 : mData.length;
}
+
+ /**
+ * Write remaining data directly to output stream without copying through
intermediate buffer.
+ *
+ * @param out the output stream to write to
+ * @throws IOException if an I/O error occurs
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(mData, mPosition, mLimit - mPosition);
+ mPosition = mLimit;
+ }
Review Comment:
The new writeTo(OutputStream) method in UnsafeByteArrayInputStream is not
tested. While integration tests may exercise this path through
StreamUtils.copy, consider adding a unit test to verify that: 1) data is
written correctly from mPosition to mLimit, 2) mPosition is updated to mLimit
after the write, and 3) the zero-copy behavior works correctly (writing
directly from the internal buffer).
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/Stream.java:
##########
@@ -43,7 +43,9 @@ interface Listener {
* streaming .
*
* @param message message received from remote peer as InputStream
- * @param messageLength the length of the message in bytes
+ * @param messageLength the length of the message in bytes.
+ * For compressed messages, this may be -1 to
indicate unknown length
+ * since decompressed size cannot be known until
all bytes are read.
Review Comment:
The documentation states "For compressed messages, this may be -1 to
indicate unknown length since decompressed size cannot be known until all bytes
are read." However, this is misleading. The -1 value is used when streaming
decompression is employed (as implemented in GrpcStreamingDecoder line 58), but
the actual decompressed size CAN be known for some compression formats (like
gzip which stores original size in the trailer). The comment should clarify
that -1 is used when employing streaming decompression to avoid buffering the
entire decompressed message in memory, not because the size is fundamentally
unknowable.
```suggestion
* For compressed messages processed via
streaming decompression,
* this may be -1 to indicate that the total
decompressed length is
* not computed up front to avoid buffering the
entire message in memory.
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java:
##########
@@ -45,7 +45,9 @@ interface FragmentListener {
* Called when a complete message fragment is received.
*
* @param rawMessage raw message as InputStream
- * @param messageLength the length of the message payload in bytes
+ * @param messageLength the length of the message payload in bytes.
+ * For compressed messages, this may be -1 to
indicate unknown length
+ * since decompressed size cannot be known until
all bytes are read.
Review Comment:
The documentation states "For compressed messages, this may be -1 to
indicate unknown length since decompressed size cannot be known until all bytes
are read." However, this is misleading. The -1 value is used when streaming
decompression is employed (as implemented in GrpcStreamingDecoder line 58), but
the actual decompressed size CAN be known for some compression formats (like
gzip which stores original size in the trailer). The comment should clarify
that -1 is used when employing streaming decompression to avoid buffering the
entire decompressed message in memory, not because the size is fundamentally
unknowable.
```suggestion
* For compressed messages using streaming
decompression, this may be -1
* to indicate that the decompressed size is
not precomputed in order to
* avoid buffering the entire message in memory.
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/compressor/Snappy.java:
##########
@@ -64,4 +66,9 @@ public byte[] decompress(byte[] payloadByteArr) {
throw new IllegalStateException(e);
}
}
+
+ @Override
+ public InputStream decompress(InputStream inputStream) throws IOException {
+ return new org.xerial.snappy.SnappyInputStream(inputStream);
+ }
Review Comment:
The new streaming compression methods (decorate and decompress(InputStream))
are not directly unit tested. While the existing compressor tests exercise the
deprecated byte[] methods and the integration tests (WriteQueueTest,
TripleClientStreamTest) exercise the new paths, consider adding explicit unit
tests for the streaming methods to ensure they properly handle edge cases like
empty streams, IOException propagation, and proper resource cleanup. This is
especially important since the deprecated methods will eventually be removed.
##########
dubbo-common/src/main/java/org/apache/dubbo/common/io/UnsafeByteArrayOutputStream.java:
##########
@@ -86,6 +86,15 @@ public void writeTo(OutputStream out) throws IOException {
out.write(mBuffer, 0, mCount);
}
+ /**
+ * Returns an InputStream that reads from the internal buffer without
copying.
+ *
+ * @return an InputStream wrapping the internal buffer
+ */
+ public UnsafeByteArrayInputStream toInputStream() {
+ return new UnsafeByteArrayInputStream(mBuffer, 0, mCount);
+ }
Review Comment:
The new toInputStream() method in UnsafeByteArrayOutputStream is not tested.
While integration tests may exercise this path, consider adding a unit test to
verify that the returned InputStream correctly wraps the internal buffer
without copying, properly reflects the current size (mCount), and can be read
from the correct position.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -49,7 +49,9 @@ interface Listener {
* Callback when message received.
*
* @param message message received
- * @param actualContentLength actual content length from body
+ * @param actualContentLength actual content length from body.
+ * For compressed messages, this may be -1
to indicate unknown length
+ * since decompressed size cannot be known
until all bytes are read.
Review Comment:
The documentation states "For compressed messages, this may be -1 to
indicate unknown length since decompressed size cannot be known until all bytes
are read." However, this is misleading. The -1 value is used when streaming
decompression is employed (as implemented in GrpcStreamingDecoder line 58), but
the actual decompressed size CAN be known for some compression formats (like
gzip which stores original size in the trailer). The comment should clarify
that -1 is used when employing streaming decompression to avoid buffering the
entire decompressed message in memory, not because the size is fundamentally
unknowable.
```suggestion
* For compressed messages, this may be
-1 when streaming
* decompression is used and the
implementation avoids buffering
* the entire decompressed message in
memory to determine its size.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]