Copilot commented on code in PR #16072:
URL: https://github.com/apache/dubbo/pull/16072#discussion_r2767687490
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java:
##########
@@ -16,50 +16,94 @@
*/
package org.apache.dubbo.rpc.protocol.tri.command;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
public class DataQueueCommand extends StreamQueueCommand {
- private final byte[] data;
+ private final InputStream dataStream;
- private final int compressFlag;
+ private final Compressor compressor;
private final boolean endStream;
private DataQueueCommand(
- TripleStreamChannelFuture streamChannelFuture, byte[] data, int
compressFlag, boolean endStream) {
+ TripleStreamChannelFuture streamChannelFuture,
+ InputStream dataStream,
+ Compressor compressor,
+ boolean endStream) {
super(streamChannelFuture);
- this.data = data;
- this.compressFlag = compressFlag;
+ this.dataStream = dataStream;
+ this.compressor = compressor;
this.endStream = endStream;
}
public static DataQueueCommand create(
- TripleStreamChannelFuture streamChannelFuture, byte[] data,
boolean endStream, int compressFlag) {
- return new DataQueueCommand(streamChannelFuture, data, compressFlag,
endStream);
+ TripleStreamChannelFuture streamChannelFuture,
+ InputStream dataStream,
+ boolean endStream,
+ Compressor compressor) {
+ return new DataQueueCommand(streamChannelFuture, dataStream,
compressor, endStream);
}
+ /**
+ * Send data frame to the channel.
+ *
+ * <p>gRPC message frame format:
+ * <pre>
+ * +----------------------+
+ * | Compressed-Flag (1B) | 0 = uncompressed, 1 = compressed
+ * +----------------------+
+ * | Message-Length (4B) | big-endian unsigned integer
+ * +----------------------+
+ * | Message Data (N) | compressed or uncompressed payload
+ * +----------------------+
+ * </pre>
+ */
@Override
public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
- if (data == null) {
+ if (dataStream == null) {
ctx.write(new DefaultHttp2DataFrame(endStream), promise);
} else {
ByteBuf buf = ctx.alloc().buffer();
+ // Write compression flag (1 byte): 0 for identity, 1 for
compressed
+ int compressFlag =
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
buf.writeByte(compressFlag);
- buf.writeInt(data.length);
- buf.writeBytes(data);
+ // Record position for length field, write placeholder (4 bytes)
+ int lengthIndex = buf.writerIndex();
+ buf.writeInt(0);
+ try {
+ // Compress and write data directly into ByteBuf using
decorator pattern
+ ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+ OutputStream compressedOut = compressor.decorate(bbos);
+ StreamUtils.copy(dataStream, compressedOut);
+ compressedOut.close();
Review Comment:
The dataStream passed to this method is not closed after being read. This
can lead to resource leaks, especially for file-based InputStreams or other
InputStreams that hold system resources. Consider wrapping the stream
operations in a try-with-resources block or explicitly closing the dataStream
in the finally block after copying is complete.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -198,16 +199,23 @@ public SSLSession getSslSession() {
}
@Override
- public ChannelFuture sendMessage(byte[] message, int compressFlag) {
+ public ChannelFuture sendMessage(InputStream message, Compressor
compressor) {
ChannelFuture checkResult = preCheck();
if (!checkResult.isSuccess()) {
return checkResult;
}
- final int messageSize = message.length;
+ // Estimate message size for flow control
+ int estimatedSize;
+ try {
+ estimatedSize = message.available();
+ } catch (IOException e) {
+ estimatedSize = 0;
+ }
Review Comment:
The flow control size estimation using message.available() is unreliable for
general InputStream types. The available() method only returns an estimate of
bytes that can be read without blocking, not the total stream size. While this
works correctly for ByteArrayInputStream (which is what's currently used in
TripleClientCall), it could cause incorrect flow control accounting if other
InputStream types are passed to sendMessage in the future. Consider documenting
this limitation in the method's javadoc or adding validation to ensure only
ByteArrayInputStream is accepted.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java:
##########
@@ -16,50 +16,94 @@
*/
package org.apache.dubbo.rpc.protocol.tri.command;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
public class DataQueueCommand extends StreamQueueCommand {
- private final byte[] data;
+ private final InputStream dataStream;
- private final int compressFlag;
+ private final Compressor compressor;
private final boolean endStream;
private DataQueueCommand(
- TripleStreamChannelFuture streamChannelFuture, byte[] data, int
compressFlag, boolean endStream) {
+ TripleStreamChannelFuture streamChannelFuture,
+ InputStream dataStream,
+ Compressor compressor,
+ boolean endStream) {
super(streamChannelFuture);
- this.data = data;
- this.compressFlag = compressFlag;
+ this.dataStream = dataStream;
+ this.compressor = compressor;
this.endStream = endStream;
}
public static DataQueueCommand create(
- TripleStreamChannelFuture streamChannelFuture, byte[] data,
boolean endStream, int compressFlag) {
- return new DataQueueCommand(streamChannelFuture, data, compressFlag,
endStream);
+ TripleStreamChannelFuture streamChannelFuture,
+ InputStream dataStream,
+ boolean endStream,
+ Compressor compressor) {
+ return new DataQueueCommand(streamChannelFuture, dataStream,
compressor, endStream);
}
+ /**
+ * Send data frame to the channel.
+ *
+ * <p>gRPC message frame format:
+ * <pre>
+ * +----------------------+
+ * | Compressed-Flag (1B) | 0 = uncompressed, 1 = compressed
+ * +----------------------+
+ * | Message-Length (4B) | big-endian unsigned integer
+ * +----------------------+
+ * | Message Data (N) | compressed or uncompressed payload
+ * +----------------------+
+ * </pre>
+ */
@Override
public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
- if (data == null) {
+ if (dataStream == null) {
ctx.write(new DefaultHttp2DataFrame(endStream), promise);
} else {
ByteBuf buf = ctx.alloc().buffer();
+ // Write compression flag (1 byte): 0 for identity, 1 for
compressed
+ int compressFlag =
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
buf.writeByte(compressFlag);
- buf.writeInt(data.length);
- buf.writeBytes(data);
+ // Record position for length field, write placeholder (4 bytes)
+ int lengthIndex = buf.writerIndex();
+ buf.writeInt(0);
+ try {
+ // Compress and write data directly into ByteBuf using
decorator pattern
+ ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+ OutputStream compressedOut = compressor.decorate(bbos);
+ StreamUtils.copy(dataStream, compressedOut);
+ compressedOut.close();
+ // Calculate actual message length: total written bytes minus
the 4-byte length field itself
+ int written = buf.writerIndex() - lengthIndex - 4;
+ buf.setInt(lengthIndex, written);
+ } catch (Exception e) {
+ buf.release();
+ promise.setFailure(e);
+ return;
+ }
Review Comment:
When an exception occurs during compression/copying, the ByteBuf is released
but the dataStream is not closed. This can lead to resource leaks. The
dataStream should also be closed in the catch block to ensure proper cleanup of
any system resources it may hold.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/ClientStream.java:
##########
@@ -73,12 +75,14 @@ default void onReady() {}
void initStream();
/**
- * Send message to remote peer.
+ * Send message to remote peer with zero-copy compression.
+ * The compressor decorates the output stream to compress data directly
into ByteBuf.
*
- * @param message message to send to remote peer
- * @return future to callback when send message is done
+ * @param message raw message stream (uncompressed)
+ * @param compressor compressor to use for compression
+ * @return future to callback when send is done
*/
- Future<?> sendMessage(byte[] message, int compressFlag);
+ Future<?> sendMessage(InputStream message, Compressor compressor);
Review Comment:
The sendMessage method signature has been changed from sendMessage(byte[],
int) to sendMessage(InputStream, Compressor), which is a breaking API change.
If ClientStream is considered a public or stable API, the old method should be
deprecated first rather than removed directly, or this should be clearly
documented as a breaking change in the PR description. If this is an
internal-only API, consider adding package-private visibility or an @Internal
annotation to make this clear.
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/DataQueueCommand.java:
##########
@@ -16,50 +16,94 @@
*/
package org.apache.dubbo.rpc.protocol.tri.command;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
+import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
public class DataQueueCommand extends StreamQueueCommand {
- private final byte[] data;
+ private final InputStream dataStream;
- private final int compressFlag;
+ private final Compressor compressor;
private final boolean endStream;
private DataQueueCommand(
- TripleStreamChannelFuture streamChannelFuture, byte[] data, int
compressFlag, boolean endStream) {
+ TripleStreamChannelFuture streamChannelFuture,
+ InputStream dataStream,
+ Compressor compressor,
+ boolean endStream) {
super(streamChannelFuture);
- this.data = data;
- this.compressFlag = compressFlag;
+ this.dataStream = dataStream;
+ this.compressor = compressor;
this.endStream = endStream;
}
public static DataQueueCommand create(
- TripleStreamChannelFuture streamChannelFuture, byte[] data,
boolean endStream, int compressFlag) {
- return new DataQueueCommand(streamChannelFuture, data, compressFlag,
endStream);
+ TripleStreamChannelFuture streamChannelFuture,
+ InputStream dataStream,
+ boolean endStream,
+ Compressor compressor) {
+ return new DataQueueCommand(streamChannelFuture, dataStream,
compressor, endStream);
}
+ /**
+ * Send data frame to the channel.
+ *
+ * <p>gRPC message frame format:
+ * <pre>
+ * +----------------------+
+ * | Compressed-Flag (1B) | 0 = uncompressed, 1 = compressed
+ * +----------------------+
+ * | Message-Length (4B) | big-endian unsigned integer
+ * +----------------------+
+ * | Message Data (N) | compressed or uncompressed payload
+ * +----------------------+
+ * </pre>
+ */
@Override
public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
- if (data == null) {
+ if (dataStream == null) {
ctx.write(new DefaultHttp2DataFrame(endStream), promise);
} else {
ByteBuf buf = ctx.alloc().buffer();
+ // Write compression flag (1 byte): 0 for identity, 1 for
compressed
+ int compressFlag =
Identity.MESSAGE_ENCODING.equals(compressor.getMessageEncoding()) ? 0 : 1;
buf.writeByte(compressFlag);
- buf.writeInt(data.length);
- buf.writeBytes(data);
+ // Record position for length field, write placeholder (4 bytes)
+ int lengthIndex = buf.writerIndex();
+ buf.writeInt(0);
+ try {
+ // Compress and write data directly into ByteBuf using
decorator pattern
+ ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+ OutputStream compressedOut = compressor.decorate(bbos);
+ StreamUtils.copy(dataStream, compressedOut);
+ compressedOut.close();
Review Comment:
This ByteBufOutputStream is not always closed on method exit.
```suggestion
try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
OutputStream compressedOut = compressor.decorate(bbos))
{
StreamUtils.copy(dataStream, compressedOut);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]