Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-17 Thread via GitHub


dajac merged PR #15954:
URL: https://github.com/apache/kafka/pull/15954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603595149


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   You may be able to reuse `isProtocolInconsistent`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603586413


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Ah now I get it.. Let me add the check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603582917


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Yes, if `requestProtocolType` or `requestProtocolName` is null, the 
validation will fail.
   
   > we should only validate them if they are non-null.
   
   We do validate them even if they are null in the existing classic group sync 
to the classic group 
https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342
 so I think we should also fail the validation if either of them is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603582917


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Yes, if `requestProtocolType` or `requestProtocolName` is null, the 
validation will fail.
   
   > we should only validate them if they are non-null.
   We do validate them even if they are null in the existing classic group sync 
to the classic group 
https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342
 so I think we should also fail the validation if either of them is null.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Yes, if `requestProtocolType` or `requestProtocolName` is null, the 
validation will fail.
   
   > we should only validate them if they are non-null.
   
   We do validate them even if they are null in the existing classic group sync 
to the classic group 
https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342
 so I think we should also fail the validation if either of them is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603566552


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Ah right, they should be validated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603566552


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   Ah right, they should be validated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603556220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   What I said is different. What I meant is that requestProtocolType and 
requestProtocolName could be null here. If they are, the validation will fail, 
no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603556220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   What I said is different. What I meant is that requestProtocolType and 
requestProtocolName could be null here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603545254


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   I'm not sure if it's necessary as we've checked if the member uses the 
classic protocol before, but I guess it doesn't hurt to explicitly check the 
protocols here. Let me add the validation..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603535727


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3931,6 +4071,74 @@ public CoordinatorResult 
classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains the appendFuture to return the 
response.
+ */
+private CoordinatorResult 
classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException,
+InconsistentGroupProtocolException, RebalanceInProgressException, 
IllegalStateException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(member.memberId(), member.memberEpoch(), 
request.generationId());
+throwIfClassicProtocolUnmatched(member, request.protocolType(), 
request.protocolName());
+throwIfRebalanceInProgress(group, member);
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+responseFuture.complete(new SyncGroupResponseData()
+.setProtocolType(request.protocolType())
+.setProtocolName(request.protocolName())
+.setAssignment(prepareAssignment(member)));
+}
+});
+
+return new CoordinatorResult<>(Collections.emptyList(), appendFuture, 
false);
+}
+
+/**
+ * Serializes the member's assigned partitions with ConsumerProtocol.
+ *
+ * @param member The ConsumerGroupMember.
+ * @return The serialized assigned partitions.
+ */
+private byte[] prepareAssignment(ConsumerGroupMember member) {

Review Comment:
   Now that we have this method, I wonder if we should inline the body of 
`deserializeProtocolVersion` here. I don't think that we will reuse 
deserializeProtocolVersion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603525427


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched(
 String requestProtocolType,
 String requestProtocolName
 ) {
+String protocolName = 
member.supportedClassicProtocols().get().iterator().next().name();

Review Comment:
   I just recalled that the protocol type and name are optional in the sync 
group request so we should only validate them if they are non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1603521402


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id from the request matches the member 
epoch.
+ *
+ * @param groupId   The ConsumerGroup id.
+ * @param memberEpoch   The member epoch.
+ * @param requestGenerationId   The generation id from the request.
+ */
+private void throwIfGenerationIdUnmatched(
+String groupId,
+int memberEpoch,
+int requestGenerationId
+) {
+if (memberEpoch != requestGenerationId) {
+throw Errors.ILLEGAL_GENERATION.exception(
+String.format("The request generation id %s is not equal to 
the member epoch %d from the consumer group %s.",
+requestGenerationId, memberEpoch, groupId)
+);
+}
+}
+
+/**
+ * Validates if the protocol type and the protocol name from the request 
matches those of the consumer group.
+ *
+ * @param memberThe ConsumerGroupMember.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfClassicProtocolUnmatched(
+ConsumerGroupMember member,
+String requestProtocolType,
+String requestProtocolName
+) {
+if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(
+String.format("The protocol type %s from member %s request is 
not equal to the group protocol type %s.",
+requestProtocolType, member.memberId(), 
ConsumerProtocol.PROTOCOL_TYPE)
+);
+} else if 
(!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName))
 {

Review Comment:
   Yes it's always set here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1602768290


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id from the request matches the member 
epoch.
+ *
+ * @param groupId   The ConsumerGroup id.
+ * @param memberEpoch   The member epoch.
+ * @param requestGenerationId   The generation id from the request.
+ */
+private void throwIfGenerationIdUnmatched(
+String groupId,

Review Comment:
   nit: I think that we could remove the group id here as the caller knows its 
context.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id from the request matches the member 
epoch.
+ *
+ * @param groupId   The ConsumerGroup id.
+ * @param memberEpoch   The member epoch.
+ * @param requestGenerationId   The generation id from the request.
+ */
+private void throwIfGenerationIdUnmatched(
+String groupId,
+int memberEpoch,
+int requestGenerationId
+) {
+if (memberEpoch != requestGenerationId) {
+throw Errors.ILLEGAL_GENERATION.exception(
+String.format("The request generation id %s is not equal to 
the member epoch %d from the consumer group %s.",
+requestGenerationId, memberEpoch, groupId)
+);
+}
+}
+
+/**
+ * Validates if the protocol type and the protocol name from the request 
matches those of the consumer group.
+ *
+ * @param memberThe ConsumerGroupMember.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfClassicProtocolUnmatched(
+ConsumerGroupMember member,
+String requestProtocolType,
+String requestProtocolName
+) {
+if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(
+String.format("The protocol type %s from member %s request is 
not equal to the group protocol type %s.",
+requestProtocolType, member.memberId(), 
ConsumerProtocol.PROTOCOL_TYPE)
+);
+} else if 
(!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName))
 {

Review Comment:
   nit: Should we extract 
`member.supportedClassicProtocols().get().iterator().next().name()` into a 
variable? I also assume that it should always be set at this point. Is it the 
case?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3931,6 +4066,65 @@ public CoordinatorResult 
classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult 
classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException,
+InconsistentGroupProtocolException, RebalanceInProgressException, 
IllegalStateException {
+String groupId = request.groupId();
+String 

Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-15 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1602027889


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+

Review Comment:
   Yeah! I was stuck in how to determine if the member has already rejoined but 
checking the state makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-15 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1601951216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+//scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());
+
+byte[] assignment = ConsumerProtocol.serializeAssignment(
+new 
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
 metadataImage.topics())),
+deserializeProtocolVersion(member.classicMemberMetadata().get())
+).array();
+
+responseFuture.complete(new SyncGroupResponseData()
+.setProtocolType(request.protocolType())
+.setProtocolName(request.protocolName())
+.setAssignment(assignment)
+.setErrorCode(Errors.NONE.code()));

Review Comment:
   Ah yes, you're right. I thought there's nothing to commit for EMPTY_RESULT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-15 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @param requestGenerationId   The generation id from the request.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfGenerationIdOrProtocolUnmatched(

Review Comment:
   nit: It may be better to split this one into two methods. One to validate 
the generation. Another one to validate the protocol type and name.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @param requestGenerationId   The generation id from the request.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfGenerationIdOrProtocolUnmatched(
+ConsumerGroup group,
+ConsumerGroupMember member,
+int requestGenerationId,
+String requestProtocolType,
+String requestProtocolName
+) {
+if (member.memberEpoch() != requestGenerationId) {
+throw Errors.ILLEGAL_GENERATION.exception(
+String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",

Review Comment:
   nit: `group epoch` -> `member epoch`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
 }
 }
 
+/**
+ * Validates if the consumer group member uses the classic protocol.
+ *
+ * @param member The ConsumerGroupMember.
+ */
+private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+if (!member.useClassicProtocol()) {
+throw new UnknownMemberIdException(
+String.format("Member %s does not use the classic protocol.", 
member.memberId())
+);
+}
+}
+
+/**
+ * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberThe ConsumerGroupMember.
+ * @param requestGenerationId   The generation id from the request.
+ * @param requestProtocolType   The protocol type from the request.
+ * @param requestProtocolName   The protocol name from the request.
+ */
+private void throwIfGenerationIdOrProtocolUnmatched(
+ConsumerGroup group,
+ConsumerGroupMember member,
+int requestGenerationId,
+String requestProtocolType,
+String requestProtocolName
+) {
+if (member.memberEpoch() != requestGenerationId) {
+throw Errors.ILLEGAL_GENERATION.exception(
+String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",
+requestGenerationId, group.groupEpoch(), group.groupId())

Review Comment:
   nit: `group.groupEpoch()` -> `member.memberEpoch()`.



##

Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600493799


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+//scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());

Review Comment:
   I just merged it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600314641


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+

Review Comment:
   I wanted to make it return REBALANCE_IN_PROGRESS but we seems to have no way 
to know whether a new rebalance is triggered in sync group...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1600311785


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync(
 return EMPTY_RESULT;
 }
 
+/**
+ * Handle a SyncGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual SyncGroup request.
+ * @param responseFuture The sync group response future.
+ *
+ * @return The result that contains records to append.
+ */
+private CoordinatorResult classicGroupSyncToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+
+ConsumerGroupMember member;
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(request.memberId(), false);
+} else {
+member = group.staticMember(instanceId);
+if (member == null) {
+throw new UnknownMemberIdException(
+String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+);
+}
+throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+}
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdOrProtocolUnmatched(
+group,
+member,
+request.generationId(),
+request.protocolType(),
+request.protocolName()
+);
+
+cancelConsumerGroupSyncTimeout(groupId, memberId);
+//scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());

Review Comment:
   Needs https://github.com/apache/kafka/pull/15921



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]

2024-05-14 Thread via GitHub


dongnuo123 opened a new pull request, #15954:
URL: https://github.com/apache/kafka/pull/15954

   This pr implements the sync group api for the consumer groups that are in 
the mixed mode. 
   
   In `classicGroupSyncToConsumerGroup`, the `assignedPartitions` calculated in 
the JoinGroup will be returned as the assignment in the sync response and the 
member session timeout will be rescheduled.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org