C0urante commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863308185


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -108,18 +107,15 @@ public Map<String, ByteBuffer> performAssignment(String 
leaderId, String protoco
         log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
                   maxOffset, coordinator.configSnapshot().offset());
 
-        short protocolVersion = memberConfigs.values().stream()
-            .allMatch(state -> state.assignment().version() == 
CONNECT_PROTOCOL_V2)
-                ? CONNECT_PROTOCOL_V2
-                : CONNECT_PROTOCOL_V1;
+        short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

Review Comment:
   Yep, exactly 👍
   Should've known that when I implemented KIP-507 originally but was still 
getting my bearings with the group coordinator logic.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -108,18 +107,15 @@ public Map<String, ByteBuffer> performAssignment(String 
leaderId, String protoco
         log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
                   maxOffset, coordinator.configSnapshot().offset());
 
-        short protocolVersion = memberConfigs.values().stream()
-            .allMatch(state -> state.assignment().version() == 
CONNECT_PROTOCOL_V2)
-                ? CONNECT_PROTOCOL_V2
-                : CONNECT_PROTOCOL_V1;
+        short protocolVersion = 
ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();

Review Comment:
   Yep, exactly 👍
   Should've known that when I implemented KIP-507 originally but was still 
getting my bearings with the group coordinator logic. Better late than never!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to