chia7712 commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r519343434



##########
File path: 
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -1579,58 +1566,58 @@ private void generateVariableLengthFieldSize(FieldSpec 
field,
                     }
                     if (tagged) {
                         
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
+                        buffer.printf("int _arraySize = _size.totalSize() - 
_sizeBeforeArray;%n");
                         buffer.printf("_cache.setArraySizeInBytes(%s, 
_arraySize);%n",
                             field.camelCaseName());
-                        buffer.printf("_size += _arraySize + 
ByteUtils.sizeOfUnsignedVarint(_arraySize);%n");
-                    } else {
-                        buffer.printf("_size += _arraySize;%n");
+                        
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize));%n");
                     }
                 } else if (field.type().isBytes()) {
+                    buffer.printf("int _sizeBeforeBytes = 
_size.totalSize();%n");

Review comment:
       ```java
   if (tagged) {
     buffer.printf("int _sizeBeforeBytes = _size.totalSize();%n");
   }
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Message.java
##########
@@ -47,7 +47,20 @@
      *                      If the specified version is too new to be supported
      *                      by this software.
      */
-    int size(ObjectSerializationCache cache, short version);
+    default int size(ObjectSerializationCache cache, short version) {
+        MessageSizeAccumulator size = new MessageSizeAccumulator();
+        addSize(size, cache, version);
+        return size.totalSize();
+    }
+
+    /**
+     * Add the size of this message to an accumulator.
+     *
+     * @param size          The size accumulator to add to
+     * @param cache         The serialization size cache to populate.
+     * @param version       The version to use.
+     */
+    void addSize(MessageSizeAccumulator size, ObjectSerializationCache cache, 
short version);

Review comment:
       I'm thinking about how to simplify this process.
   
   Could we reuse the method ```void write(Writable writable, 
ObjectSerializationCache cache, short version)``` ? Maybe we can create a 
```Writable``` instance but it does not write data to any output. Instead, it 
calculate the size of buffer according to input data.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##########
@@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
     public static EnvelopeRequest parse(ByteBuffer buffer, short version) {
         return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, 
buffer), version);
     }
+
+    public EnvelopeRequestData data() {
+        return data;
+    }
+
+    @Override
+    public Send toSend(String destination, RequestHeader header) {

Review comment:
       If all requests are using auto-generated data, should this be default 
implementation of ```AbstractRequest```?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to