Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac merged PR #15954: URL: https://github.com/apache/kafka/pull/15954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603595149 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: You may be able to reuse `isProtocolInconsistent`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603586413 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: Ah now I get it.. Let me add the check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603582917 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: Yes, if `requestProtocolType` or `requestProtocolName` is null, the validation will fail. > we should only validate them if they are non-null. We do validate them even if they are null in the existing classic group sync to the classic group https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342 so I think we should also fail the validation if either of them is null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603582917 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: Yes, if `requestProtocolType` or `requestProtocolName` is null, the validation will fail. > we should only validate them if they are non-null. We do validate them even if they are null in the existing classic group sync to the classic group https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342 so I think we should also fail the validation if either of them is null. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: Yes, if `requestProtocolType` or `requestProtocolName` is null, the validation will fail. > we should only validate them if they are non-null. We do validate them even if they are null in the existing classic group sync to the classic group https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4335-L4342 so I think we should also fail the validation if either of them is null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603566552 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: Ah right, they should be validated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603566552 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: Ah right, they should be validated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603556220 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: What I said is different. What I meant is that requestProtocolType and requestProtocolName could be null here. If they are, the validation will fail, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603556220 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: What I said is different. What I meant is that requestProtocolType and requestProtocolName could be null here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603545254 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: I'm not sure if it's necessary as we've checked if the member uses the classic protocol before, but I guess it doesn't hurt to explicitly check the protocols here. Let me add the validation.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603535727 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3931,6 +4071,74 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains the appendFuture to return the response. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, +InconsistentGroupProtocolException, RebalanceInProgressException, IllegalStateException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(member.memberId(), member.memberEpoch(), request.generationId()); +throwIfClassicProtocolUnmatched(member, request.protocolType(), request.protocolName()); +throwIfRebalanceInProgress(group, member); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +if (t == null) { +cancelConsumerGroupSyncTimeout(groupId, memberId); +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +responseFuture.complete(new SyncGroupResponseData() +.setProtocolType(request.protocolType()) +.setProtocolName(request.protocolName()) +.setAssignment(prepareAssignment(member))); +} +}); + +return new CoordinatorResult<>(Collections.emptyList(), appendFuture, false); +} + +/** + * Serializes the member's assigned partitions with ConsumerProtocol. + * + * @param member The ConsumerGroupMember. + * @return The serialized assigned partitions. + */ +private byte[] prepareAssignment(ConsumerGroupMember member) { Review Comment: Now that we have this method, I wonder if we should inline the body of `deserializeProtocolVersion` here. I don't think that we will reuse deserializeProtocolVersion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603525427 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1244,15 +1244,16 @@ private void throwIfClassicProtocolUnmatched( String requestProtocolType, String requestProtocolName ) { +String protocolName = member.supportedClassicProtocols().get().iterator().next().name(); Review Comment: I just recalled that the protocol type and name are optional in the sync group request so we should only validate them if they are non-null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1603521402 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id from the request matches the member epoch. + * + * @param groupId The ConsumerGroup id. + * @param memberEpoch The member epoch. + * @param requestGenerationId The generation id from the request. + */ +private void throwIfGenerationIdUnmatched( +String groupId, +int memberEpoch, +int requestGenerationId +) { +if (memberEpoch != requestGenerationId) { +throw Errors.ILLEGAL_GENERATION.exception( +String.format("The request generation id %s is not equal to the member epoch %d from the consumer group %s.", +requestGenerationId, memberEpoch, groupId) +); +} +} + +/** + * Validates if the protocol type and the protocol name from the request matches those of the consumer group. + * + * @param memberThe ConsumerGroupMember. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfClassicProtocolUnmatched( +ConsumerGroupMember member, +String requestProtocolType, +String requestProtocolName +) { +if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) { +throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception( +String.format("The protocol type %s from member %s request is not equal to the group protocol type %s.", +requestProtocolType, member.memberId(), ConsumerProtocol.PROTOCOL_TYPE) +); +} else if (!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName)) { Review Comment: Yes it's always set here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1602768290 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id from the request matches the member epoch. + * + * @param groupId The ConsumerGroup id. + * @param memberEpoch The member epoch. + * @param requestGenerationId The generation id from the request. + */ +private void throwIfGenerationIdUnmatched( +String groupId, Review Comment: nit: I think that we could remove the group id here as the caller knows its context. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id from the request matches the member epoch. + * + * @param groupId The ConsumerGroup id. + * @param memberEpoch The member epoch. + * @param requestGenerationId The generation id from the request. + */ +private void throwIfGenerationIdUnmatched( +String groupId, +int memberEpoch, +int requestGenerationId +) { +if (memberEpoch != requestGenerationId) { +throw Errors.ILLEGAL_GENERATION.exception( +String.format("The request generation id %s is not equal to the member epoch %d from the consumer group %s.", +requestGenerationId, memberEpoch, groupId) +); +} +} + +/** + * Validates if the protocol type and the protocol name from the request matches those of the consumer group. + * + * @param memberThe ConsumerGroupMember. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfClassicProtocolUnmatched( +ConsumerGroupMember member, +String requestProtocolType, +String requestProtocolName +) { +if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) { +throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception( +String.format("The protocol type %s from member %s request is not equal to the group protocol type %s.", +requestProtocolType, member.memberId(), ConsumerProtocol.PROTOCOL_TYPE) +); +} else if (!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName)) { Review Comment: nit: Should we extract `member.supportedClassicProtocols().get().iterator().next().name()` into a variable? I also assume that it should always be set at this point. Is it the case? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3931,6 +4066,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, +InconsistentGroupProtocolException, RebalanceInProgressException, IllegalStateException { +String groupId = request.groupId(); +String memberId
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1602027889 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + Review Comment: Yeah! I was stuck in how to determine if the member has already rejoined but checking the state makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1601951216 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + +cancelConsumerGroupSyncTimeout(groupId, memberId); +//scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); + +byte[] assignment = ConsumerProtocol.serializeAssignment( +new ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(), metadataImage.topics())), +deserializeProtocolVersion(member.classicMemberMetadata().get()) +).array(); + +responseFuture.complete(new SyncGroupResponseData() +.setProtocolType(request.protocolType()) +.setProtocolName(request.protocolName()) +.setAssignment(assignment) +.setErrorCode(Errors.NONE.code())); Review Comment: Ah yes, you're right. I thought there's nothing to commit for EMPTY_RESULT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param memberThe ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfGenerationIdOrProtocolUnmatched( Review Comment: nit: It may be better to split this one into two methods. One to validate the generation. Another one to validate the protocol type and name. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param memberThe ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfGenerationIdOrProtocolUnmatched( +ConsumerGroup group, +ConsumerGroupMember member, +int requestGenerationId, +String requestProtocolType, +String requestProtocolName +) { +if (member.memberEpoch() != requestGenerationId) { +throw Errors.ILLEGAL_GENERATION.exception( +String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", Review Comment: nit: `group epoch` -> `member epoch`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } +/** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ +private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { +if (!member.useClassicProtocol()) { +throw new UnknownMemberIdException( +String.format("Member %s does not use the classic protocol.", member.memberId()) +); +} +} + +/** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param memberThe ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ +private void throwIfGenerationIdOrProtocolUnmatched( +ConsumerGroup group, +ConsumerGroupMember member, +int requestGenerationId, +String requestProtocolType, +String requestProtocolName +) { +if (member.memberEpoch() != requestGenerationId) { +throw Errors.ILLEGAL_GENERATION.exception( +String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", +requestGenerationId, group.groupEpoch(), group.groupId()) Review Comment: nit: `group.groupEpoch()` -> `member.memberEpoch()`. ## group-coordina
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1600493799 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + +cancelConsumerGroupSyncTimeout(groupId, memberId); +//scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); Review Comment: I just merged it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1600314641 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + Review Comment: I wanted to make it return REBALANCE_IN_PROGRESS but we seems to have no way to know whether a new rebalance is triggered in sync group... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16762: SyncGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1600311785 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3898,6 +3994,65 @@ public CoordinatorResult classicGroupSync( return EMPTY_RESULT; } +/** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ +private CoordinatorResult classicGroupSyncToConsumerGroup( +ConsumerGroup group, +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); + +ConsumerGroupMember member; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(request.memberId(), false); +} else { +member = group.staticMember(instanceId); +if (member == null) { +throw new UnknownMemberIdException( +String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) +); +} +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +} + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdOrProtocolUnmatched( +group, +member, +request.generationId(), +request.protocolType(), +request.protocolName() +); + +cancelConsumerGroupSyncTimeout(groupId, memberId); +//scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); Review Comment: Needs https://github.com/apache/kafka/pull/15921 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org