BitoAgent commented on code in PR #13786:
URL: https://github.com/apache/dubbo/pull/13786#discussion_r1552428992


##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/StreamingDecoder.java:
##########
@@ -37,13 +37,6 @@ interface FragmentListener {
          */
         void onFragmentMessage(InputStream rawMessage);
 
-        /**
-         * @param rawMessage raw message
-         */
-        default void onFragmentMessage(InputStream dataHeader, InputStream 
rawMessage) {
-            onFragmentMessage(rawMessage);
-        }
-
         default void onClose() {}

Review Comment:
    **Issue**: Removing the default implementation of 'onFragmentMessage' that 
accepts two 'InputStream' arguments simplifies the interface but requires all 
implementers to handle stream merging themselves if needed. <br> **Fix**: 
Ensure that all current and future implementations of 'StreamingDecoder' are 
aware of this change and properly implement message fragment handling according 
to their specific requirements. <br> **Code Suggestion**: 
    ```
    Ensure that all current and future implementations of 'StreamingDecoder' 
properly implement message fragment handling. This involves overriding the 
'onFragmentMessage' method to handle the merging of InputStreams if necessary. 
For guidance, refer to the updated interface definition and consider the 
implications of stream management in your specific context.
    ```
   
   



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -130,16 +127,12 @@ private void deliver() {
     }
 
     private void processHeader() throws IOException {
-        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength);
         byte[] offsetData = new byte[lengthFieldOffset];
         int ignore = accumulate.read(offsetData);
-        bos.write(offsetData);
         processOffset(new ByteArrayInputStream(offsetData), lengthFieldOffset);
         byte[] lengthBytes = new byte[lengthFieldLength];
         ignore = accumulate.read(lengthBytes);
-        bos.write(lengthBytes);
         requiredLength = bytesToInt(lengthBytes);
-        this.dataHeader = new ByteArrayInputStream(bos.toByteArray());
 
         // Continue reading the frame body.
         state = DecodeState.PAYLOAD;

Review Comment:
    **Issue**: The removal of 'ByteArrayOutputStream' and direct processing of 
byte arrays from the 'InputStream' is a significant change that could impact 
how message headers are handled. Ensure that this change does not affect the 
protocol's expected behavior, especially in edge cases with large headers or 
payloads. <br> **Fix**: Thoroughly test the new implementation with various 
message sizes and formats to ensure compatibility with the expected HTTP/2 
framing and encoding behaviors. <br> **Code Suggestion**: 
    ```
    -        ByteArrayOutputStream bos = new 
ByteArrayOutputStream(lengthFieldOffset + lengthFieldLength);
             byte[] offsetData = new byte[lengthFieldOffset];
             int ignore = accumulate.read(offsetData);
    -        bos.write(offsetData);
             processOffset(new ByteArrayInputStream(offsetData), 
lengthFieldOffset);
             byte[] lengthBytes = new byte[lengthFieldLength];
             ignore = accumulate.read(lengthBytes);
    -        bos.write(lengthBytes);
             requiredLength = bytesToInt(lengthBytes);
    -        this.dataHeader = new ByteArrayInputStream(bos.toByteArray());
    ```
   
   



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -167,8 +160,8 @@ private void processBody() throws IOException {
         requiredLength = lengthFieldOffset + lengthFieldLength;
     }
 
-    protected void invokeListener(InputStream inputStream) {
-        this.listener.onFragmentMessage(dataHeader, inputStream);
+    public void invokeListener(InputStream inputStream) {
+        this.listener.onFragmentMessage(inputStream);

Review Comment:
    **Issue**: Changing the visibility of 'invokeListener' from protected to 
public increases the method's exposure, potentially allowing unintended 
interactions. Review if this change is necessary for the intended design and 
consider potential security implications. <br> **Fix**: If the method needs to 
be exposed publicly for legitimate reasons, ensure that its usage does not 
introduce security vulnerabilities or allow misuse. <br> **Code Suggestion**: 
    ```
    -    protected void invokeListener(InputStream inputStream) {
    +    public void invokeListener(InputStream inputStream) {
    ```
   
   



##########
dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java:
##########
@@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel 
oldScopeModel, Scope
         }
         if (CollectionUtils.isNotEmpty(this.registries)) {
             this.registries.forEach(registryConfig -> {
-                if (registryConfig.getScopeModel() != applicationModel) {
+                if (registryConfig != null && registryConfig.getScopeModel() 
!= applicationModel) {

Review Comment:
    **Issue**: Adding null check 'registryConfig != null' before accessing 
'registryConfig.getScopeModel()' enhances the robustness of the code. However, 
it's essential to ensure that this additional check aligns with the expected 
logic and does not hide potential issues where 'registryConfig' should not be 
null. <br> **Fix**: Validate that the added null check is indeed required and 
does not mask a scenario where a null 'registryConfig' indicates a 
misconfiguration or a bug elsewhere in the code. <br> **Code Suggestion**: 
    ```
    Validate that the added null check is indeed required and does not mask a 
scenario where a null 'registryConfig' indicates a misconfiguration or a bug 
elsewhere in the code.
    ```
   
   



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java:
##########
@@ -16,123 +16,103 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
 
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.PackableMethod;
+import org.apache.dubbo.rpc.model.PackableMethodFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.protobuf.Message;
-
-import static 
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
+import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
 
 public class GrpcCompositeCodec implements HttpMessageCodec {
 
-    private final ProtobufHttpMessageCodec protobufHttpMessageCodec;
+    private static final String PACKABLE_METHOD_CACHE = 
"PACKABLE_METHOD_CACHE";
 
-    private final WrapperHttpMessageCodec wrapperHttpMessageCodec;
+    private final URL url;
 
-    public GrpcCompositeCodec(
-            ProtobufHttpMessageCodec protobufHttpMessageCodec, 
WrapperHttpMessageCodec wrapperHttpMessageCodec) {
-        this.protobufHttpMessageCodec = protobufHttpMessageCodec;
-        this.wrapperHttpMessageCodec = wrapperHttpMessageCodec;
-    }
+    private final FrameworkModel frameworkModel;
+
+    private final String mediaType;
 
-    public void setEncodeTypes(Class<?>[] encodeTypes) {
-        this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes);
+    private PackableMethod packableMethod;
+
+    public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String 
mediaType) {
+        this.url = url;
+        this.frameworkModel = frameworkModel;
+        this.mediaType = mediaType;
     }
 
-    public void setDecodeTypes(Class<?>[] decodeTypes) {
-        this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes);
+    public void loadPackableMethod(MethodDescriptor methodDescriptor) {
+        if (methodDescriptor instanceof PackableMethod) {
+            packableMethod = (PackableMethod) methodDescriptor;
+            return;
+        }
+        Map<MethodDescriptor, PackableMethod> cacheMap = 
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
+                .getServiceMetadata()
+                .getAttributeMap()
+                .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new 
ConcurrentHashMap<>());
+        packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> 
frameworkModel
+                .getExtensionLoader(PackableMethodFactory.class)
+                
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
+                        .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
+                .create(methodDescriptor, url, mediaType));
     }
 
     @Override
     public void encode(OutputStream outputStream, Object data, Charset 
charset) throws EncodeException {
-        // protobuf
-        // TODO int compressed = 
Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding())
 ? 0 :
-        // 1;
         try {
-            int compressed = 0;
-            outputStream.write(compressed);
-            if (isProtobuf(data)) {
-                ProtobufWriter.write(protobufHttpMessageCodec, outputStream, 
data);
-                return;
-            }
-            // wrapper
-            wrapperHttpMessageCodec.encode(outputStream, data);
-        } catch (IOException e) {
+            outputStream.write(0);
+            byte[] bytes = packableMethod.packResponse(data);
+            writeLength(outputStream, bytes.length);
+            outputStream.write(bytes);
+        } catch (Exception e) {
             throw new EncodeException(e);
         }
     }
 
     @Override
     public Object decode(InputStream inputStream, Class<?> targetType, Charset 
charset) throws DecodeException {
-        if (isProtoClass(targetType)) {
-            return protobufHttpMessageCodec.decode(inputStream, targetType, 
charset);
+        try {
+            byte[] data = StreamUtils.readBytes(inputStream);
+            return packableMethod.parseRequest(data);
+        } catch (Exception e) {
+            throw new DecodeException(e);
         }
-        return wrapperHttpMessageCodec.decode(inputStream, targetType, 
charset);
     }
 
     @Override
     public Object[] decode(InputStream inputStream, Class<?>[] targetTypes, 
Charset charset) throws DecodeException {
-        if (targetTypes.length > 1) {
-            return wrapperHttpMessageCodec.decode(inputStream, targetTypes, 
charset);
-        }
-        return HttpMessageCodec.super.decode(inputStream, targetTypes, 
charset);
-    }
-
-    private static void writeLength(OutputStream outputStream, int length) {
-        try {
-            outputStream.write(((length >> 24) & 0xFF));
-            outputStream.write(((length >> 16) & 0xFF));
-            outputStream.write(((length >> 8) & 0xFF));
-            outputStream.write((length & 0xFF));
-        } catch (IOException e) {
-            throw new EncodeException(e);
+        Object message = decode(inputStream, ArrayUtils.isEmpty(targetTypes) ? 
null : targetTypes[0], charset);
+        if (message instanceof Object[]) {
+            return (Object[]) message;
         }
+        return new Object[] {message};
     }
 
     @Override
     public MediaType mediaType() {
         return MediaType.APPLICATION_GRPC;
     }
 
-    private static boolean isProtobuf(Object data) {
-        if (data == null) {
-            return false;
-        }
-        return isProtoClass(data.getClass());
-    }
-
-    private static boolean isProtoClass(Class<?> clazz) {
-        while (clazz != Object.class && clazz != null) {
-            Class<?>[] interfaces = clazz.getInterfaces();
-            if (interfaces.length > 0) {
-                for (Class<?> clazzInterface : interfaces) {
-                    if 
(PROTOBUF_MESSAGE_CLASS_NAME.equalsIgnoreCase(clazzInterface.getName())) {
-                        return true;
-                    }
-                }
-            }
-            clazz = clazz.getSuperclass();
-        }
-        return false;
-    }
-
-    /**
-     * lazy init protobuf class
-     */
-    private static class ProtobufWriter {
-
-        private static void write(HttpMessageCodec codec, OutputStream 
outputStream, Object data) {
-            int serializedSize = ((Message) data).getSerializedSize();
-            // write length
-            writeLength(outputStream, serializedSize);
-            codec.encode(outputStream, data);
-        }
+    private void writeLength(OutputStream outputStream, int length) throws 
IOException {
+        outputStream.write(((length >> 24) & 0xFF));
+        outputStream.write(((length >> 16) & 0xFF));
+        outputStream.write(((length >> 8) & 0xFF));
+        outputStream.write((length & 0xFF));

Review Comment:
    **Performance Issue**: Refactoring the codec to use a more streamlined 
approach for handling encoding and decoding, focusing on performance and 
maintainability. <br> **Fix**: Refactor the codec implementation to focus on 
performance by using a more efficient method for encoding and decoding, 
ensuring thread safety and reducing complexity. <br> **Code Suggestion**: 
    ```
    Refactor the codec implementation in GrpcCompositeCodec.java to improve 
performance and maintainability. Focus on streamlining the approach for 
handling encoding and decoding. Ensure the new implementation is thread-safe 
and reduces complexity.
    ```
   
   



##########
dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java:
##########
@@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel 
oldScopeModel, Scope
         }
         if (CollectionUtils.isNotEmpty(this.registries)) {
             this.registries.forEach(registryConfig -> {
-                if (registryConfig.getScopeModel() != applicationModel) {
+                if (registryConfig != null && registryConfig.getScopeModel() 
!= applicationModel) {

Review Comment:
    **Performance Issue**: Potential null pointer access: this conditional 
expression can lead to a null pointer exception. <br> **Fix**: Ensure 
'registryConfig' is not null before accessing its methods to avoid 
NullPointerException. <br> **Code Suggestion**: 
    ```
    -                if (registryConfig.getScopeModel() != applicationModel) {
    +                if (registryConfig != null && 
registryConfig.getScopeModel() != applicationModel) {
    ```
   
   



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java:
##########
@@ -16,123 +16,103 @@
  */
 package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
 
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.PackableMethod;
+import org.apache.dubbo.rpc.model.PackableMethodFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.protobuf.Message;
-
-import static 
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
+import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
 
 public class GrpcCompositeCodec implements HttpMessageCodec {
 
-    private final ProtobufHttpMessageCodec protobufHttpMessageCodec;
+    private static final String PACKABLE_METHOD_CACHE = 
"PACKABLE_METHOD_CACHE";
 
-    private final WrapperHttpMessageCodec wrapperHttpMessageCodec;
+    private final URL url;
 
-    public GrpcCompositeCodec(
-            ProtobufHttpMessageCodec protobufHttpMessageCodec, 
WrapperHttpMessageCodec wrapperHttpMessageCodec) {
-        this.protobufHttpMessageCodec = protobufHttpMessageCodec;
-        this.wrapperHttpMessageCodec = wrapperHttpMessageCodec;
-    }
+    private final FrameworkModel frameworkModel;
+
+    private final String mediaType;
 
-    public void setEncodeTypes(Class<?>[] encodeTypes) {
-        this.wrapperHttpMessageCodec.setEncodeTypes(encodeTypes);
+    private PackableMethod packableMethod;
+
+    public GrpcCompositeCodec(URL url, FrameworkModel frameworkModel, String 
mediaType) {
+        this.url = url;
+        this.frameworkModel = frameworkModel;
+        this.mediaType = mediaType;
     }
 
-    public void setDecodeTypes(Class<?>[] decodeTypes) {
-        this.wrapperHttpMessageCodec.setDecodeTypes(decodeTypes);
+    public void loadPackableMethod(MethodDescriptor methodDescriptor) {
+        if (methodDescriptor instanceof PackableMethod) {
+            packableMethod = (PackableMethod) methodDescriptor;
+            return;
+        }
+        Map<MethodDescriptor, PackableMethod> cacheMap = 
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
+                .getServiceMetadata()
+                .getAttributeMap()
+                .computeIfAbsent(PACKABLE_METHOD_CACHE, k -> new 
ConcurrentHashMap<>());
+        packableMethod = cacheMap.computeIfAbsent(methodDescriptor, md -> 
frameworkModel
+                .getExtensionLoader(PackableMethodFactory.class)
+                
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel())
+                        .getString(DUBBO_PACKABLE_METHOD_FACTORY, DEFAULT_KEY))
+                .create(methodDescriptor, url, mediaType));
     }
 
     @Override
     public void encode(OutputStream outputStream, Object data, Charset 
charset) throws EncodeException {
-        // protobuf
-        // TODO int compressed = 
Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding())
 ? 0 :
-        // 1;
         try {
-            int compressed = 0;
-            outputStream.write(compressed);
-            if (isProtobuf(data)) {
-                ProtobufWriter.write(protobufHttpMessageCodec, outputStream, 
data);
-                return;
-            }
-            // wrapper
-            wrapperHttpMessageCodec.encode(outputStream, data);
-        } catch (IOException e) {
+            outputStream.write(0);

Review Comment:
    **Security Issue**: Direct writing of unvalidated data to an OutputStream 
could lead to data integrity issues. Ensure data is validated or sanitized. 
<br> **Fix**: Validate or sanitize output data before writing to an 
OutputStream to prevent data integrity issues. <br> **Code Suggestion**: 
    ```
    In GrpcCompositeCodec.java, before writing data to an OutputStream, 
implement a validation or sanitization process to ensure the integrity of the 
data being written.
    ```
   
   



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -167,8 +160,8 @@ private void processBody() throws IOException {
         requiredLength = lengthFieldOffset + lengthFieldLength;
     }
 
-    protected void invokeListener(InputStream inputStream) {
-        this.listener.onFragmentMessage(dataHeader, inputStream);
+    public void invokeListener(InputStream inputStream) {

Review Comment:
    **Security Issue**: Public method 'invokeListener' should not expose its 
internal InputStream directly, consider copying the data or using a safer 
access method. <br> **Fix**: Ensure encapsulation of internal streams to 
prevent unauthorized stream manipulation. <br> **Code Suggestion**: 
    ```
    -    protected void invokeListener(InputStream inputStream) {
    +    public void invokeListener(InputStream inputStream) {
    ```
   
   



##########
dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java:
##########
@@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel 
oldScopeModel, Scope
         }
         if (CollectionUtils.isNotEmpty(this.registries)) {
             this.registries.forEach(registryConfig -> {
-                if (registryConfig.getScopeModel() != applicationModel) {
+                if (registryConfig != null && registryConfig.getScopeModel() 
!= applicationModel) {

Review Comment:
    **Security Issue**: Null check added for registryConfig before accessing 
its methods could prevent potential NullPointerException. <br> **Fix**: Ensure 
that objects are checked for null before method access to prevent 
NullPointerException. <br> **Code Suggestion**: 
    ```
    -        return StringUtils.isEmpty(getGroup())
    -                ? (interfaceConfig != null ? interfaceConfig.getGroup() : 
getGroup())
    -                : getGroup();
    +        if (StringUtils.isEmpty(getGroup()) && interfaceConfig != null) {
    +            return interfaceConfig.getGroup();
    +        }
    +        return getGroup();
    ```
   
   



##########
dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java:
##########
@@ -252,7 +252,7 @@ protected void postProcessAfterScopeModelChanged(ScopeModel 
oldScopeModel, Scope
         }
         if (CollectionUtils.isNotEmpty(this.registries)) {
             this.registries.forEach(registryConfig -> {
-                if (registryConfig.getScopeModel() != applicationModel) {
+                if (registryConfig != null && registryConfig.getScopeModel() 
!= applicationModel) {
                     registryConfig.setScopeModel(applicationModel);
                 }
             });

Review Comment:
    **Scalability Issue**: Null check added for registryConfig in the lambda 
expression. This change prevents potential NullPointerExceptions when iterating 
over registries, enhancing the robustness of the code, especially under scaling 
scenarios where dynamic registry updates might lead to temporary 
inconsistencies. <br> **Fix**: The fix properly handles cases where 
registryConfig might be null, ensuring that the system remains resilient and 
functional even when facing transient state inconsistencies during scaling 
operations or registry updates. <br> **Code Suggestion**: 
    ```
    -                if (registryConfig.getScopeModel() != applicationModel) {
    +                if (registryConfig != null && 
registryConfig.getScopeModel() != applicationModel) {
    ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org
For additional commands, e-mail: notifications-h...@dubbo.apache.org

Reply via email to