Gerrrr commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r836431097



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
##########
@@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) {
             return buf.array();
         }
 
+        private byte[] serializeV1(final String topic, final 
SubscriptionResponseWrapper<V> data) {
+            final byte[] serializedData = data.getForeignValue() == null ? 
null : serializer.serialize(topic, data.getForeignValue());
+            final int serializedDataLength = serializedData == null ? 0 : 
serializedData.length;
+            final long[] originalHash = data.getOriginalValueHash();
+            final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES;
+            final int primaryPartitionLength = Integer.BYTES;
+            final int dataLength = 1 + hashLength + serializedDataLength + 
primaryPartitionLength;
+
+            final ByteBuffer buf = ByteBuffer.allocate(dataLength);
+
+            if (originalHash != null) {
+                buf.put(data.getVersion());
+            } else {
+                buf.put((byte) (data.getVersion() | (byte) 0x80));
+            }
+            buf.putInt(data.getPrimaryPartition());

Review comment:
       I am not sure if this is a good idea. As an advantage, we will remove 
duplicated parts in `serialize`. However, there are 2 downsides:
   * We use v0 path in 2 situations - when it is *real* v0 and when we upgrade 
from v0 (i.e. a data record can actually have a primary partition and v1 as 
part of its state). This means that we can't just put `data.getVersion()` into 
the buffer, but have to hardcode it to `0`. Making this logic generic enough to 
handle v0, upgrade-from, and v1 will be cumbersome.
   * Moving a fixed-size field to the end of the byte array will complicate 
deserialization. Consider
   
   
https://github.com/apache/kafka/blob/d9f6c52e2fa3a7ebd46efdade63a0c392acc8d0c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java#L234-L242
   
   Right now, the logic for finding PK size is straightforward. It can be 
summarized as "take it from here and until the end of the buffer". If we move 
primary partition to the end, we'll have to remove another 4 bytes iff the 
version is greater than 0.
   
   I'd say that code duplication is lesser evil in this case. WDYT? I am happy 
to move primary partition to the end if you don't agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to