C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863329552
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java: ########## @@ -230,15 +230,16 @@ public static ExtendedWorkerState deserializeMetadata(ByteBuffer buffer) { * ScheduledDelay => Int32 * </pre> */ - public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) { + public static ByteBuffer serializeAssignment(ExtendedAssignment assignment, boolean sessioned) { // comparison depends on reference equality for now if (assignment == null || ExtendedAssignment.empty().equals(assignment)) { return null; } Struct struct = assignment.toStruct(); - ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf() + Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1; + ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf() + ASSIGNMENT_V1.sizeOf(struct)); - CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer); + protocolHeader.writeTo(buffer); Review Comment: Ah yeah, good call! Much cleaner than what we had before. It was a little more involved than I initially thought to make this change but IMO the end result is cleaner and easier to read, so hopefully it's worth the inflation in the diff here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org