Copilot commented on code in PR #15616: URL: https://github.com/apache/dubbo/pull/15616#discussion_r2268688177
########## dubbo-serialization/dubbo-serialization-hessian2/src/main/java/org/apache/dubbo/common/serialize/hessian2/Hessian2Serialization.java: ########## @@ -83,4 +83,71 @@ public ObjectInput deserialize(URL url, InputStream is) throws IOException { .getBean(Hessian2FactoryManager.class); return new Hessian2ObjectInput(is, hessian2FactoryManager); } + + @Override + public void serializeZeroCopy(URL url, Class<?> clz, Object obj, OutputStream outputStream) throws IOException { + Hessian2FactoryManager hessian2FactoryManager = Optional.ofNullable(url) + .map(URL::getOrDefaultFrameworkModel) + .orElseGet(FrameworkModel::defaultModel) + .getBeanFactory() + .getBean(Hessian2FactoryManager.class); + + // Check if we're in streaming mode (used with StreamingLengthEncoder) + boolean isStreamingMode = isStreamingMode(url); + + // Use Hessian2Output directly for zero-copy serialization + com.alibaba.com.caucho.hessian.io.Hessian2Output hessianOutput = + new com.alibaba.com.caucho.hessian.io.Hessian2Output(outputStream); + hessianOutput.setSerializerFactory(hessian2FactoryManager.getSerializerFactory( + Thread.currentThread().getContextClassLoader())); + + try { + if (isStreamingMode) { + // In streaming mode, write raw data without additional length prefixes + // since StreamingLengthEncoder already handles length prefixing + hessianOutput.writeObject(obj); + hessianOutput.flushBuffer(); + } else { + // In direct mode, use standard Hessian2 behavior with native formatting + hessianOutput.writeObject(obj); + hessianOutput.flushBuffer(); + } + } finally { + // Don't close the output stream as it belongs to the caller + // Just flush any remaining data + try { + hessianOutput.close(); + } catch (Exception e) { + // Ignore close errors, stream ownership belongs to caller Review Comment: The catch block silently ignores close exceptions with a generic comment. This could hide important errors during stream cleanup. Consider logging the exception or at least checking if it's a critical error type. ```suggestion logger.warn("Exception occurred while closing Hessian2Output stream.", e); ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ZeroCopyDirectUnpack.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.Serialization; +import org.apache.dubbo.rpc.model.UnPack; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +/** + * Generic zero-copy UnPack implementation that delegates to zero-copy capable serializations. + * This implementation bypasses the TripleWrapper layer and uses direct deserialization + * for better performance when the underlying serialization supports zero-copy. + */ +public class ZeroCopyDirectUnpack implements UnPack { + + private final URL url; + private final String serializeName; + private final Class<?>[] argumentTypes; + private final Serialization serialization; + + public ZeroCopyDirectUnpack(URL url, String serializeName, Class<?>[] argumentTypes) { + this.url = url; + this.serializeName = serializeName; + this.argumentTypes = argumentTypes; + + // Get the serialization via SPI + this.serialization = url.getOrDefaultFrameworkModel() + .getExtensionLoader(Serialization.class) + .getExtension(serializeName); + } + + @Override + public Object unpack(byte[] data) throws IOException, ClassNotFoundException { + if (argumentTypes.length == 1) { + // Single argument case - read length prefix first (same as ZeroCopyDirectPack) + ByteArrayInputStream bais = new ByteArrayInputStream(data); + + // Read length prefix (4 bytes) + int length = ((bais.read() & 0xFF) << 24) + | ((bais.read() & 0xFF) << 16) + | ((bais.read() & 0xFF) << 8) + | (bais.read() & 0xFF); Review Comment: The length validation checks for negative values but doesn't handle the case where available data is less than 4 bytes for the length prefix. If any of the bais.read() calls return -1, the bitwise operations will produce unexpected results. ```suggestion int length = readIntFromStream(bais); ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java: ########## @@ -86,23 +87,51 @@ public ReflectionPackableMethod( } boolean singleArgument = method.getRpcType() != MethodDescriptor.RpcType.UNARY; Review Comment: The logic appears inverted. For UNARY RPC calls, singleArgument should typically be false (multiple arguments possible), but this sets it to true for non-UNARY types. This contradicts the PR description which targets UNARY calls. ```suggestion boolean singleArgument = method.getRpcType() == MethodDescriptor.RpcType.UNARY; ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/WrapperStreamingDecoder.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.tri; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.serialize.MultipleSerialization; +import org.apache.dubbo.common.serialize.ZeroCopyCapable; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Wrapper streaming decoder that parses streaming data format created by WrapperStreamingEncoder. + * + * This decoder handles the format: [4-byte-length1][param1-data][4-byte-length2][param2-data]... + * Compatible with the 4-byte big-endian length prefix format used by StreamingLengthEncoder. + */ +public class WrapperStreamingDecoder { + + /** + * Decode streaming arguments data back to original objects. + * + * @param streamingData the encoded streaming data containing all parameters + * @param argumentTypes expected class types for each parameter + * @param baseSerializeType base serialization type (e.g., "hessian2", "fastjson2") + * @param url configuration URL for deserialization + * @param multipleSerialization serialization service for fallback deserialization + * @return decoded object array + * @throws IOException if deserialization fails + * @throws ClassNotFoundException if class cannot be found during deserialization + */ + public static Object[] decodeStreamingArgs( + byte[] streamingData, + Class<?>[] argumentTypes, + String baseSerializeType, + URL url, + MultipleSerialization multipleSerialization) + throws IOException, ClassNotFoundException { + + if (streamingData == null || argumentTypes == null) { + throw new IllegalArgumentException("Streaming data and argument types cannot be null"); + } + + if (streamingData.length == 0) { + return new Object[argumentTypes.length]; + } + + ByteArrayInputStream inputStream = new ByteArrayInputStream(streamingData); + Object[] results = new Object[argumentTypes.length]; + + for (int i = 0; i < argumentTypes.length; i++) { + // Read 4-byte big-endian length prefix (matches StreamingLengthEncoder format) + int length = readFixedLengthPrefix(inputStream); + + if (length == 0) { + // Handle null parameter + results[i] = null; + continue; + } + + // Read parameter data + byte[] paramData = new byte[length]; + int bytesRead = inputStream.read(paramData); + if (bytesRead != length) { + throw new IOException( + "Failed to read complete parameter data. Expected: " + length + ", actual: " + bytesRead); + } + + // Deserialize parameter using appropriate method + results[i] = + deserializeParameter(paramData, argumentTypes[i], baseSerializeType, url, multipleSerialization); + } + + return results; + } + + /** + * Read 4-byte fixed length prefix in big-endian format. + * Compatible with StreamingLengthEncoder.writeFixedLengthPrefix format. + */ + private static int readFixedLengthPrefix(InputStream inputStream) throws IOException { + int b1 = inputStream.read(); + int b2 = inputStream.read(); + int b3 = inputStream.read(); + int b4 = inputStream.read(); + + if (b1 < 0 || b2 < 0 || b3 < 0 || b4 < 0) { Review Comment: This validation correctly handles end-of-stream detection, but the same issue exists in ZeroCopyDirectUnpack where this validation is missing. ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java: ########## @@ -407,13 +521,108 @@ public byte[] pack(Object obj) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); for (int i = 0; i < arguments.length; i++) { Object argument = arguments[i]; - multipleSerialization.serialize(url, serialize, actualRequestTypes[i], argument, bos); + multipleSerialization.serialize(url, originalSerialize, actualRequestTypes[i], argument, bos); builder.addArgs(bos.toByteArray()); bos.reset(); } return builder.build().toByteArray(); } + @Override + public void pack(Object obj, OutputStream outputStream) throws IOException, Exception { + Object[] arguments; + if (singleArgument) { + arguments = new Object[] {obj}; + } else { + arguments = (Object[]) obj; + } + + // Check if we should use zero-copy streaming mode + boolean shouldUseStreaming = shouldUseStreamingMode(); + String effectiveSerializeType = shouldUseStreaming ? serialize + "-streaming" : serialize; + + TripleCustomerProtocolWrapper.TripleRequestWrapper.Builder builder = + TripleCustomerProtocolWrapper.TripleRequestWrapper.Builder.newBuilder(); + builder.setSerializeType(effectiveSerializeType); + for (String type : argumentsType) { + builder.addArgTypes(type); + } + + if (actualRequestTypes == null || actualRequestTypes.length == 0) { + byte[] wrapperBytes = builder.build().toByteArray(); + outputStream.write(wrapperBytes); + return; + } + + if (shouldUseStreaming) { + // True zero-copy streaming path using existing StreamingLengthEncoder infrastructure + ByteArrayOutputStream streamingBuffer = new ByteArrayOutputStream(); + // Use original SPI name for actual serialization operations + WrapperStreamingEncoder.encodeStreamingArgs( + arguments, actualRequestTypes, originalSerialize, url, streamingBuffer); + + // Add single streaming data as one args element + builder.addArgs(streamingBuffer.toByteArray()); + } else { + // Traditional path for backward compatibility + // For streaming methods, arguments may include StreamObserver which is not serialized + int serializableArgumentCount = Math.min(arguments.length, actualRequestTypes.length); + ByteArrayOutputStream argumentStream = new ByteArrayOutputStream(); + for (int i = 0; i < serializableArgumentCount; i++) { + Object argument = arguments[i]; + multipleSerialization.serialize( + url, originalSerialize, actualRequestTypes[i], argument, argumentStream); + builder.addArgs(argumentStream.toByteArray()); + argumentStream.reset(); + } + } + + byte[] wrapperBytes = builder.build().toByteArray(); + outputStream.write(wrapperBytes); + } + + /** + * Check if we should use zero-copy streaming mode. + * This is based on RPC type compatibility and serialization capability. + */ + private boolean shouldUseStreamingMode() { + if (rpcType != MethodDescriptor.RpcType.UNARY) { Review Comment: The streaming mode logic correctly restricts to UNARY calls, but this conflicts with the earlier singleArgument assignment logic which seems inverted. ```suggestion if (rpcType == MethodDescriptor.RpcType.UNARY) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org