fqaiser94 commented on code in PR #13533:
URL: https://github.com/apache/kafka/pull/13533#discussion_r1167560778


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java:
##########
@@ -52,10 +52,10 @@ public Change<T> deserialize(final String topic, final 
Headers headers, final by
         // The format we need to deserialize is:
         // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
         // {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE newOldFlag=2}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE newOldFlag=2}
         // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3}
         // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4}
-        // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE isLatest}{BYTE newOldFlag=5}
+        // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE isLatest}{BYTE newOldFlag=5}

Review Comment:
   nit: the last byte is now less of a `newOldFlag` and more like a `version`? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -113,9 +113,9 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
                 throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
                         + " : " + data.newValue + ") in ChangeSerializer, 
which is not allowed unless upgrading.");
             } else {
-                final int capacity = UINT32_SIZE + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
+                final int capacity = MAX_VARINT_LENGTH + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
                 buf = ByteBuffer.allocate(capacity);
-                ByteUtils.writeUnsignedInt(buf, newDataLength);
+                ByteUtils.writeVarint(newDataLength, buf);

Review Comment:
   nit: I think it would be simpler to just use `buf.putInt(newDataLength)` 
which always writes out 4 bytes. This is the approach taken in other similar 
places in the codebase such as 
[`CombinedKeySchema.toBytes`](https://github.com/apache/kafka/blob/9c12e462106343fbc6af5873074d48f98687af39/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L74),
 
[`ProcessRecordContext.serialize`](https://github.com/apache/kafka/blob/9c12e462106343fbc6af5873074d48f98687af39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java#L104),
 and 
[`ProcessorMetadata.serialize`](https://github.com/apache/kafka/blob/9c12e462106343fbc6af5873074d48f98687af39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorMetadata.java#L78).
 
   
   While it's nice that `ByteUtils.writeVarint` can potentially write out fewer 
bytes, the variability in the number of bytes used to encode an INT makes the 
code more complex as you now have to allocate the max-possible-size array and 
then later copy to a smaller array. IMO the increase in code complexity is not 
worth the memory optimization. 
   
   Either way, functionally speaking both approaches work so I will approve the 
PR regardless. 
   



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -130,7 +130,11 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
             throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
         }
 
-        return buf.array();
+        final byte[] serialized = new byte[buf.position()];
+        buf.position(0);
+        buf.get(serialized);
+
+        return serialized;

Review Comment:
   then you don't need this change or any comment to explain why we need to 
resize to a smaller array. 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -130,7 +130,11 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
             throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
         }
 
-        return buf.array();
+        final byte[] serialized = new byte[buf.position()];
+        buf.position(0);
+        buf.get(serialized);

Review Comment:
   nit: this resize-to-smaller-array-operation is only necessary in the case 
where we use `writeVarint`. I would add a comment here to explain that. 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##########
@@ -113,9 +113,9 @@ public byte[] serialize(final String topic, final Headers 
headers, final Change<
                 throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
                         + " : " + data.newValue + ") in ChangeSerializer, 
which is not allowed unless upgrading.");
             } else {
-                final int capacity = UINT32_SIZE + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
+                final int capacity = MAX_VARINT_LENGTH + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
                 buf = ByteBuffer.allocate(capacity);
-                ByteUtils.writeUnsignedInt(buf, newDataLength);
+                ByteUtils.writeVarint(newDataLength, buf);
                 buf.put(newData).put(oldData).put((byte) 2);

Review Comment:
   or better yet, I would rewrite this maybe like this? 
   ```
   final int maxCapacity = MAX_VARINT_LENGTH + newDataLength + oldDataLength + 
NEW_OLD_FLAG_SIZE;
   final ByteBuffer maxCapacityBuffer = ByteBuffer.allocate(maxCapacity);
   ByteUtils.writeVarint(newDataLength, maxCapacityBuffer);
   maxCapacityBuffer.put(newData).put(oldData).put((byte) 2);
   
   final int actualCapacity = maxCapacityBuffer.position();
   buf = ByteBuffer.allocate(actualCapacity).put(maxCapacityBuffer.array(), 0, 
actualCapacity);
   ```
   
   (note: we obviously don't need any of this if we use the `buf.putInt` 
method). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to