zheguang commented on code in PR #21706:
URL: https://github.com/apache/kafka/pull/21706#discussion_r2935197888


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java:
##########
@@ -98,17 +98,27 @@ static Headers headers(final byte[] 
rawAggregationWithHeaders) {
      * Extract the raw aggregation bytes from serialized 
AggregationWithHeaders,
      * stripping the headers prefix.
      */
-    static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+    public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
         if (aggregationWithHeaders == null) {
             return null;
         }
 
+        // If the header is empty, then copy the value bytes directly
+        if (aggregationWithHeaders.length > 0 && aggregationWithHeaders[0] == 
0x00) {
+            // Strip header size's varint byte, and empty headers consume no 
bytes
+            final byte[] res = new byte[aggregationWithHeaders.length - 1]; 
+            System.arraycopy(aggregationWithHeaders, 1, res, 0, res.length);
+            return res;
+        }
+
         final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
-        readHeaders(buffer);
+        // Skip the headers bytes without deserizization or copying
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize); 
         return readBytes(buffer, buffer.remaining());
     }
 
-    private static Headers readHeaders(final ByteBuffer buffer) {
+    public static Headers readHeaders(final ByteBuffer buffer) {
         final int headersSize = ByteUtils.readVarint(buffer);
         final byte[] rawHeaders = readBytes(buffer, headersSize);
         return HeadersDeserializer.deserialize(rawHeaders);

Review Comment:
   Hm... not sure -- for empty headers there is a fast (enough?) path in 
`HeadersDeserializer.deserialize`: 
   ```java
   // in HeadersDeserializer
       public static Headers deserialize(final byte[] data) {
           if (data == null || data.length == 0) {
               return new RecordHeaders();
           }
   ```
   Btw `readHeaders` accepts a `ByteBuffer`, as opposed to a `byte[]` in the 
other optimized methods, where the fast path you proposed is more natural. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to