BitoAgent commented on code in PR #13786: URL: https://github.com/apache/dubbo/pull/13786#discussion_r1552428992
########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java: ########## @@ -37,13 +37,6 @@ interface FragmentListener { */ void onFragmentMessage(InputStream rawMessage); - /** - * @param rawMessage raw message - */ - default void onFragmentMessage(InputStream dataHeader, InputStream rawMessage) { - onFragmentMessage(rawMessage); - } - default void onClose() {} Review Comment: **Issue**: Removing the default implementation of 'onFragmentMessage' that accepts two 'InputStream' arguments simplifies the interface but requires all implementers to handle stream merging themselves if needed. <br> **Fix**: Ensure that all current and future implementations of 'StreamingDecoder' are aware of this change and properly implement message fragment handling according to their specific requirements. <br> **Code Suggestion**: ``` Ensure that all current and future implementations of 'StreamingDecoder' properly implement message fragment handling. This involves overriding the 'onFragmentMessage' method to handle the merging of InputStreams if necessary. For guidance, refer to the updated interface definition and consider the implications of stream management in your specific context. ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java: ########## @@ -130,16 +127,12 @@ private void deliver() { } private void processHeader() throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength); byte[] offsetData = new byte[lengthFieldOffset]; int ignore = accumulate.read(offsetData); - bos.write(offsetData); processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset); byte[] lengthBytes = new byte[lengthFieldLength]; ignore = accumulate.read(lengthBytes); - bos.write(lengthBytes); requiredLength = bytesToInt(lengthBytes); - this.dataHeader = new ByteArrayInputStream(bos.toByteArray()); // Continue reading the frame body. state = DecodeState.PAYLOAD; Review Comment: **Issue**: The removal of 'ByteArrayOutputStream' and direct processing of byte arrays from the 'InputStream' is a significant change that could impact how message headers are handled. Ensure that this change does not affect the protocol's expected behavior, especially in edge cases with large headers or payloads. <br> **Fix**: Thoroughly test the new implementation with various message sizes and formats to ensure compatibility with the expected HTTP/2 framing and encoding behaviors. <br> **Code Suggestion**: ``` - ByteArrayOutputStream bos = new ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength); byte[] offsetData = new byte[lengthFieldOffset]; int ignore = accumulate.read(offsetData); - bos.write(offsetData); processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset); byte[] lengthBytes = new byte[lengthFieldLength]; ignore = accumulate.read(lengthBytes); - bos.write(lengthBytes); requiredLength = bytesToInt(lengthBytes); - this.dataHeader = new ByteArrayInputStream(bos.toByteArray()); ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java: ########## @@ -167,8 +160,8 @@ private void processBody() throws IOException { requiredLength = lengthFieldOffset + lengthFieldLength; } - protected void invokeListener(InputStream inputStream) { - this.listener.onFragmentMessage(dataHeader, inputStream); + public void invokeListener(InputStream inputStream) { + this.listener.onFragmentMessage(inputStream); Review Comment: **Issue**: Changing the visibility of 'invokeListener' from protected to public increases the method's exposure, potentially allowing unintended interactions. Review if this change is necessary for the intended design and consider potential security implications. <br> **Fix**: If the method needs to be exposed publicly for legitimate reasons, ensure that its usage does not introduce security vulnerabilities or allow misuse. <br> **Code Suggestion**: ``` - protected void invokeListener(InputStream inputStream) { + public void invokeListener(InputStream inputStream) { ``` ########## dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java: ########## @@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel oldScopeModel, Scope } if (CollectionUtils.isNotEmpty(this.registries)) { this.registries.forEach(registryConfig -> { - if (registryConfig.getScopeModel() != applicationModel) { + if (registryConfig != null && registryConfig.getScopeModel() != applicationModel) { Review Comment: **Issue**: Adding null check 'registryConfig != null' before accessing 'registryConfig.getScopeModel()' enhances the robustness of the code. However, it's essential to ensure that this additional check aligns with the expected logic and does not hide potential issues where 'registryConfig' should not be null. <br> **Fix**: Validate that the added null check is indeed required and does not mask a scenario where a null 'registryConfig' indicates a misconfiguration or a bug elsewhere in the code. <br> **Code Suggestion**: ``` Validate that the added null check is indeed required and does not mask a scenario where a null 'registryConfig' indicates a misconfiguration or a bug elsewhere in the code. ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java: ########## @@ -16,123 +16,103 @@ */ package org.apache.dubbo.rpc.protocol.tri.h12.grpc; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; +import org.apache.dubbo.common.io.StreamUtils; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.model.MethodDescriptor; +import org.apache.dubbo.rpc.model.PackableMethod; +import org.apache.dubbo.rpc.model.PackableMethodFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import com.google.protobuf.Message; - -import static org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME; +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY; public class GrpcCompositeCodec implements HttpMessageCodec { - private final ProtobufHttpMessageCodec protobufHttpMessageCodec; + private static final String PACKABLE_METHOD_CACHE = "PACKABLE_METHOD_CACHE"; - private final WrapperHttpMessageCodec wrapperHttpMessageCodec; + private final URL url; - public GrpcCompositeCodec( - ProtobufHttpMessageCodec protobufHttpMessageCodec, WrapperHttpMessageCodec wrapperHttpMessageCodec) { - this.protobufHttpMessageCodec = protobufHttpMessageCodec; - this.wrapperHttpMessageCodec = wrapperHttpMessageCodec; - } + private final FrameworkModel frameworkModel; + + private final String mediaType; - public void setEncodeTypes(Class<?>[] encodeTypes) { - this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes); + private PackableMethod packableMethod; + + public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String mediaType) { + this.url = url; + this.frameworkModel = frameworkModel; + this.mediaType = mediaType; } - public void setDecodeTypes(Class<?>[] decodeTypes) { - this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes); + public void loadPackableMethod(MethodDescriptor methodDescriptor) { + if (methodDescriptor instanceof PackableMethod) { + packableMethod = (PackableMethod) methodDescriptor; + return; + } + Map<MethodDescriptor, PackableMethod> cacheMap = (Map<MethodDescriptor, PackableMethod>) url.getServiceModel() + .getServiceMetadata() + .getAttributeMap() + .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new ConcurrentHashMap<>()); + packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> frameworkModel + .getExtensionLoader(PackableMethodFactory.class) + .getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()) + .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY)) + .create(methodDescriptor, url, mediaType)); } @Override public void encode(OutputStream outputStream, Object data, Charset charset) throws EncodeException { - // protobuf - // TODO int compressed = Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding()) ? 0 : - // 1; try { - int compressed = 0; - outputStream.write(compressed); - if (isProtobuf(data)) { - ProtobufWriter.write(protobufHttpMessageCodec, outputStream, data); - return; - } - // wrapper - wrapperHttpMessageCodec.encode(outputStream, data); - } catch (IOException e) { + outputStream.write(0); + byte[] bytes = packableMethod.packResponse(data); + writeLength(outputStream, bytes.length); + outputStream.write(bytes); + } catch (Exception e) { throw new EncodeException(e); } } @Override public Object decode(InputStream inputStream, Class<?> targetType, Charset charset) throws DecodeException { - if (isProtoClass(targetType)) { - return protobufHttpMessageCodec.decode(inputStream, targetType, charset); + try { + byte[] data = StreamUtils.readBytes(inputStream); + return packableMethod.parseRequest(data); + } catch (Exception e) { + throw new DecodeException(e); } - return wrapperHttpMessageCodec.decode(inputStream, targetType, charset); } @Override public Object[] decode(InputStream inputStream, Class<?>[] targetTypes, Charset charset) throws DecodeException { - if (targetTypes.length > 1) { - return wrapperHttpMessageCodec.decode(inputStream, targetTypes, charset); - } - return HttpMessageCodec.super.decode(inputStream, targetTypes, charset); - } - - private static void writeLength(OutputStream outputStream, int length) { - try { - outputStream.write(((length >> 24) & 0xFF)); - outputStream.write(((length >> 16) & 0xFF)); - outputStream.write(((length >> 8) & 0xFF)); - outputStream.write((length & 0xFF)); - } catch (IOException e) { - throw new EncodeException(e); + Object message = decode(inputStream, ArrayUtils.isEmpty(targetTypes) ? null : targetTypes[0], charset); + if (message instanceof Object[]) { + return (Object[]) message; } + return new Object[] {message}; } @Override public MediaType mediaType() { return MediaType.APPLICATION_GRPC; } - private static boolean isProtobuf(Object data) { - if (data == null) { - return false; - } - return isProtoClass(data.getClass()); - } - - private static boolean isProtoClass(Class<?> clazz) { - while (clazz != Object.class && clazz != null) { - Class<?>[] interfaces = clazz.getInterfaces(); - if (interfaces.length > 0) { - for (Class<?> clazzInterface : interfaces) { - if (PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) { - return true; - } - } - } - clazz = clazz.getSuperclass(); - } - return false; - } - - /** - * lazy init protobuf class - */ - private static class ProtobufWriter { - - private static void write(HttpMessageCodec codec, OutputStream outputStream, Object data) { - int serializedSize = ((Message) data).getSerializedSize(); - // write length - writeLength(outputStream, serializedSize); - codec.encode(outputStream, data); - } + private void writeLength(OutputStream outputStream, int length) throws IOException { + outputStream.write(((length >> 24) & 0xFF)); + outputStream.write(((length >> 16) & 0xFF)); + outputStream.write(((length >> 8) & 0xFF)); + outputStream.write((length & 0xFF)); Review Comment: **Performance Issue**: Refactoring the codec to use a more streamlined approach for handling encoding and decoding, focusing on performance and maintainability. <br> **Fix**: Refactor the codec implementation to focus on performance by using a more efficient method for encoding and decoding, ensuring thread safety and reducing complexity. <br> **Code Suggestion**: ``` Refactor the codec implementation in GrpcCompositeCodec.java to improve performance and maintainability. Focus on streamlining the approach for handling encoding and decoding. Ensure the new implementation is thread-safe and reduces complexity. ``` ########## dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java: ########## @@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel oldScopeModel, Scope } if (CollectionUtils.isNotEmpty(this.registries)) { this.registries.forEach(registryConfig -> { - if (registryConfig.getScopeModel() != applicationModel) { + if (registryConfig != null && registryConfig.getScopeModel() != applicationModel) { Review Comment: **Performance Issue**: Potential null pointer access: this conditional expression can lead to a null pointer exception. <br> **Fix**: Ensure 'registryConfig' is not null before accessing its methods to avoid NullPointerException. <br> **Code Suggestion**: ``` - if (registryConfig.getScopeModel() != applicationModel) { + if (registryConfig != null && registryConfig.getScopeModel() != applicationModel) { ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java: ########## @@ -16,123 +16,103 @@ */ package org.apache.dubbo.rpc.protocol.tri.h12.grpc; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; +import org.apache.dubbo.common.io.StreamUtils; +import org.apache.dubbo.common.utils.ArrayUtils; import org.apache.dubbo.remoting.http12.exception.DecodeException; import org.apache.dubbo.remoting.http12.exception.EncodeException; import org.apache.dubbo.remoting.http12.message.HttpMessageCodec; import org.apache.dubbo.remoting.http12.message.MediaType; +import org.apache.dubbo.rpc.model.FrameworkModel; +import org.apache.dubbo.rpc.model.MethodDescriptor; +import org.apache.dubbo.rpc.model.PackableMethod; +import org.apache.dubbo.rpc.model.PackableMethodFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import com.google.protobuf.Message; - -import static org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME; +import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY; public class GrpcCompositeCodec implements HttpMessageCodec { - private final ProtobufHttpMessageCodec protobufHttpMessageCodec; + private static final String PACKABLE_METHOD_CACHE = "PACKABLE_METHOD_CACHE"; - private final WrapperHttpMessageCodec wrapperHttpMessageCodec; + private final URL url; - public GrpcCompositeCodec( - ProtobufHttpMessageCodec protobufHttpMessageCodec, WrapperHttpMessageCodec wrapperHttpMessageCodec) { - this.protobufHttpMessageCodec = protobufHttpMessageCodec; - this.wrapperHttpMessageCodec = wrapperHttpMessageCodec; - } + private final FrameworkModel frameworkModel; + + private final String mediaType; - public void setEncodeTypes(Class<?>[] encodeTypes) { - this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes); + private PackableMethod packableMethod; + + public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String mediaType) { + this.url = url; + this.frameworkModel = frameworkModel; + this.mediaType = mediaType; } - public void setDecodeTypes(Class<?>[] decodeTypes) { - this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes); + public void loadPackableMethod(MethodDescriptor methodDescriptor) { + if (methodDescriptor instanceof PackableMethod) { + packableMethod = (PackableMethod) methodDescriptor; + return; + } + Map<MethodDescriptor, PackableMethod> cacheMap = (Map<MethodDescriptor, PackableMethod>) url.getServiceModel() + .getServiceMetadata() + .getAttributeMap() + .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new ConcurrentHashMap<>()); + packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> frameworkModel + .getExtensionLoader(PackableMethodFactory.class) + .getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()) + .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY)) + .create(methodDescriptor, url, mediaType)); } @Override public void encode(OutputStream outputStream, Object data, Charset charset) throws EncodeException { - // protobuf - // TODO int compressed = Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding()) ? 0 : - // 1; try { - int compressed = 0; - outputStream.write(compressed); - if (isProtobuf(data)) { - ProtobufWriter.write(protobufHttpMessageCodec, outputStream, data); - return; - } - // wrapper - wrapperHttpMessageCodec.encode(outputStream, data); - } catch (IOException e) { + outputStream.write(0); Review Comment: **Security Issue**: Direct writing of unvalidated data to an OutputStream could lead to data integrity issues. Ensure data is validated or sanitized. <br> **Fix**: Validate or sanitize output data before writing to an OutputStream to prevent data integrity issues. <br> **Code Suggestion**: ``` In GrpcCompositeCodec.java, before writing data to an OutputStream, implement a validation or sanitization process to ensure the integrity of the data being written. ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java: ########## @@ -167,8 +160,8 @@ private void processBody() throws IOException { requiredLength = lengthFieldOffset + lengthFieldLength; } - protected void invokeListener(InputStream inputStream) { - this.listener.onFragmentMessage(dataHeader, inputStream); + public void invokeListener(InputStream inputStream) { Review Comment: **Security Issue**: Public method 'invokeListener' should not expose its internal InputStream directly, consider copying the data or using a safer access method. <br> **Fix**: Ensure encapsulation of internal streams to prevent unauthorized stream manipulation. <br> **Code Suggestion**: ``` - protected void invokeListener(InputStream inputStream) { + public void invokeListener(InputStream inputStream) { ``` ########## dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java: ########## @@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel oldScopeModel, Scope } if (CollectionUtils.isNotEmpty(this.registries)) { this.registries.forEach(registryConfig -> { - if (registryConfig.getScopeModel() != applicationModel) { + if (registryConfig != null && registryConfig.getScopeModel() != applicationModel) { Review Comment: **Security Issue**: Null check added for registryConfig before accessing its methods could prevent potential NullPointerException. <br> **Fix**: Ensure that objects are checked for null before method access to prevent NullPointerException. <br> **Code Suggestion**: ``` - return StringUtils.isEmpty(getGroup()) - ? (interfaceConfig != null ? interfaceConfig.getGroup() : getGroup()) - : getGroup(); + if (StringUtils.isEmpty(getGroup()) && interfaceConfig != null) { + return interfaceConfig.getGroup(); + } + return getGroup(); ``` ########## dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java: ########## @@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel oldScopeModel, Scope } if (CollectionUtils.isNotEmpty(this.registries)) { this.registries.forEach(registryConfig -> { - if (registryConfig.getScopeModel() != applicationModel) { + if (registryConfig != null && registryConfig.getScopeModel() != applicationModel) { registryConfig.setScopeModel(applicationModel); } }); Review Comment: **Scalability Issue**: Null check added for registryConfig in the lambda expression. This change prevents potential NullPointerExceptions when iterating over registries, enhancing the robustness of the code, especially under scaling scenarios where dynamic registry updates might lead to temporary inconsistencies. <br> **Fix**: The fix properly handles cases where registryConfig might be null, ensuring that the system remains resilient and functional even when facing transient state inconsistencies during scaling operations or registry updates. <br> **Code Suggestion**: ``` - if (registryConfig.getScopeModel() != applicationModel) { + if (registryConfig != null && registryConfig.getScopeModel() != applicationModel) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org