Gerrrr commented on a change in pull request #11945: URL: https://github.com/apache/kafka/pull/11945#discussion_r836431097
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java ########## @@ -92,6 +103,32 @@ public void setIfUnset(final SerdeGetter getter) { return buf.array(); } + private byte[] serializeV1(final String topic, final SubscriptionResponseWrapper<V> data) { + final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue()); + final int serializedDataLength = serializedData == null ? 0 : serializedData.length; + final long[] originalHash = data.getOriginalValueHash(); + final int hashLength = originalHash == null ? 0 : 2 * Long.BYTES; + final int primaryPartitionLength = Integer.BYTES; + final int dataLength = 1 + hashLength + serializedDataLength + primaryPartitionLength; + + final ByteBuffer buf = ByteBuffer.allocate(dataLength); + + if (originalHash != null) { + buf.put(data.getVersion()); + } else { + buf.put((byte) (data.getVersion() | (byte) 0x80)); + } + buf.putInt(data.getPrimaryPartition()); Review comment: I am not sure if this is a good idea. As an advantage, we will remove duplicated parts in `serialize`. However, there are 2 downsides: * We use v0 path in 2 situations - when it is *real* v0 and when we upgrade from v0 (i.e. a data record can actually have a primary partition and v1 as part of its state). This means that we can't just put `data.getVersion()` into the buffer, but have to hardcode it to `0`. Making this logic generic enough to handle v0, upgrade-from, and v1 will be cumbersome. * Moving a fixed-size field to the end of the byte array will complicate deserialization. Consider https://github.com/apache/kafka/blob/d9f6c52e2fa3a7ebd46efdade63a0c392acc8d0c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java#L234-L242 Right now, the logic for finding PK size is straightforward. It can be summarized as "take it from here and until the end of the buffer". If we move primary partition to the end, we'll have to remove another 4 bytes iff the version is greater than 0. I'd say that code duplication is lesser evil in this case. WDYT? I am happy to move primary partition to the end if you don't agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org