C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863329552


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java:
##########
@@ -230,15 +230,16 @@ public static ExtendedWorkerState 
deserializeMetadata(ByteBuffer buffer) {
      *   ScheduledDelay     => Int32
      * </pre>
      */
-    public static ByteBuffer serializeAssignment(ExtendedAssignment 
assignment) {
+    public static ByteBuffer serializeAssignment(ExtendedAssignment 
assignment, boolean sessioned) {
         // comparison depends on reference equality for now
         if (assignment == null || 
ExtendedAssignment.empty().equals(assignment)) {
             return null;
         }
         Struct struct = assignment.toStruct();
-        ByteBuffer buffer = 
ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+        Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : 
CONNECT_PROTOCOL_HEADER_V1;
+        ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf()
                                                 + 
ASSIGNMENT_V1.sizeOf(struct));
-        CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+        protocolHeader.writeTo(buffer);

Review Comment:
   Ah yeah, good call! Much cleaner than what we had before. It was a little 
more involved than I initially thought to make this change but IMO the end 
result is cleaner and easier to read, so hopefully it's worth the inflation in 
the diff here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to