dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1602768290
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id from the request matches the member epoch. + * + * @param groupId The ConsumerGroup id. + * @param memberEpoch The member epoch. + * @param requestGenerationId The generation id from the request. + */ + private void throwIfGenerationIdUnmatched( + String groupId, Review Comment: nit: I think that we could remove the group id here as the caller knows its context. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id from the request matches the member epoch. + * + * @param groupId The ConsumerGroup id. + * @param memberEpoch The member epoch. + * @param requestGenerationId The generation id from the request. + */ + private void throwIfGenerationIdUnmatched( + String groupId, + int memberEpoch, + int requestGenerationId + ) { + if (memberEpoch != requestGenerationId) { + throw Errors.ILLEGAL_GENERATION.exception( + String.format("The request generation id %s is not equal to the member epoch %d from the consumer group %s.", + requestGenerationId, memberEpoch, groupId) + ); + } + } + + /** + * Validates if the protocol type and the protocol name from the request matches those of the consumer group. + * + * @param member The ConsumerGroupMember. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ + private void throwIfClassicProtocolUnmatched( + ConsumerGroupMember member, + String requestProtocolType, + String requestProtocolName + ) { + if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception( + String.format("The protocol type %s from member %s request is not equal to the group protocol type %s.", + requestProtocolType, member.memberId(), ConsumerProtocol.PROTOCOL_TYPE) + ); + } else if (!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName)) { Review Comment: nit: Should we extract `member.supportedClassicProtocols().get().iterator().next().name()` into a variable? I also assume that it should always be set at this point. Is it the case? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3931,6 +4066,65 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, + InconsistentGroupProtocolException, RebalanceInProgressException, IllegalStateException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdUnmatched(groupId, member.memberEpoch(), request.generationId()); + throwIfClassicProtocolUnmatched(member, request.protocolType(), request.protocolName()); + Review Comment: nit: Should we remove this empty line? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -11996,16 +12055,295 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult3.appendFuture.complete(null); + JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get(); assertEquals( new JoinGroupResponseData() .setMemberId(memberId1) .setGenerationId(11) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setProtocolName("range"), - joinResult3.joinFuture.get() + joinResponse3 ); context.assertSessionTimeout(groupId, memberId1, request3.sessionTimeoutMs()); context.assertSyncTimeout(groupId, memberId1, request3.rebalanceTimeoutMs()); + + // Member 1 sends sync request to get the assigned partitions. + context.verifyClassicGroupSyncToConsumerGroup( + groupId, + joinResponse3.memberId(), + joinResponse3.generationId(), + joinResponse3.protocolName(), + joinResponse3.protocolType(), + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(zarTopicName, 0) + ) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) { + List<TopicPartition> topicPartitions = Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + topicPartitions + ), + version + ))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(protocols) + ) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + context.verifyClassicGroupSyncToConsumerGroup( + groupId, + memberId1, + 10, + "range", + ConsumerProtocol.PROTOCOL_TYPE, + topicPartitions, + version + ); + } + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + // Consumer group with a member that doesn't use the classic protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .build())) + .build(); + + // Request with unknown member id. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + + // Request with unknown instance id. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGroupInstanceId("unknown-instance-id") + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + + // Request with member id that doesn't use the classic protocol. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .build())) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("roundrobin") + .build()) + ); + + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName("connect") + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + assertThrows(IllegalGenerationException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(9) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); Review Comment: Should we add a test to validate the rebalance in progress error? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -11996,16 +12055,295 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult3.appendFuture.complete(null); + JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get(); assertEquals( new JoinGroupResponseData() .setMemberId(memberId1) .setGenerationId(11) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setProtocolName("range"), - joinResult3.joinFuture.get() + joinResponse3 ); context.assertSessionTimeout(groupId, memberId1, request3.sessionTimeoutMs()); context.assertSyncTimeout(groupId, memberId1, request3.rebalanceTimeoutMs()); + + // Member 1 sends sync request to get the assigned partitions. + context.verifyClassicGroupSyncToConsumerGroup( + groupId, + joinResponse3.memberId(), + joinResponse3.generationId(), + joinResponse3.protocolName(), + joinResponse3.protocolType(), + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(zarTopicName, 0) + ) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) { + List<TopicPartition> topicPartitions = Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + topicPartitions + ), + version + ))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(protocols) + ) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + context.verifyClassicGroupSyncToConsumerGroup( + groupId, + memberId1, + 10, + "range", + ConsumerProtocol.PROTOCOL_TYPE, + topicPartitions, + version + ); + } + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + // Consumer group with a member that doesn't use the classic protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .build())) + .build(); + + // Request with unknown member id. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + + // Request with unknown instance id. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGroupInstanceId("unknown-instance-id") + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + + // Request with member id that doesn't use the classic protocol. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .build())) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a static member. Review Comment: static? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id from the request matches the member epoch. + * + * @param groupId The ConsumerGroup id. + * @param memberEpoch The member epoch. + * @param requestGenerationId The generation id from the request. + */ + private void throwIfGenerationIdUnmatched( + String groupId, + int memberEpoch, + int requestGenerationId + ) { + if (memberEpoch != requestGenerationId) { + throw Errors.ILLEGAL_GENERATION.exception( + String.format("The request generation id %s is not equal to the member epoch %d from the consumer group %s.", + requestGenerationId, memberEpoch, groupId) + ); + } + } + + /** + * Validates if the protocol type and the protocol name from the request matches those of the consumer group. + * + * @param member The ConsumerGroupMember. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ + private void throwIfClassicProtocolUnmatched( + ConsumerGroupMember member, + String requestProtocolType, + String requestProtocolName + ) { + if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception( + String.format("The protocol type %s from member %s request is not equal to the group protocol type %s.", + requestProtocolType, member.memberId(), ConsumerProtocol.PROTOCOL_TYPE) + ); + } else if (!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName)) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception( + String.format("The protocol name %s from member %s request is not equal to the protocol name %s returned in the join response.", + requestProtocolName, member.memberId(), member.supportedClassicProtocols().get().iterator().next().name()) + ); + } + } + + /** + * Validates if a new rebalance has been triggered and the member should rejoin to catch up. + * + * @param group The ConsumerGroup. + * @param member The ConsumerGroupMember. + */ + private void throwIfRebalanceInProgress( + ConsumerGroup group, + ConsumerGroupMember member + ) { + if (group.groupEpoch() > member.memberEpoch() && !member.state().equals(MemberState.UNREVOKED_PARTITIONS)) { Review Comment: nit: This logic deserves a comment to explain the reasoning. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3931,6 +4066,65 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. Review Comment: nit: Do we generate any records? If not, we should update this comment. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3931,6 +4066,65 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, + InconsistentGroupProtocolException, RebalanceInProgressException, IllegalStateException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdUnmatched(groupId, member.memberEpoch(), request.generationId()); + throwIfClassicProtocolUnmatched(member, request.protocolType(), request.protocolName()); + + throwIfRebalanceInProgress(group, member); + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + if (t == null) { + cancelConsumerGroupSyncTimeout(groupId, memberId); + scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(request.protocolType()) + .setProtocolName(request.protocolName()) + .setAssignment(ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(), metadataImage.topics())), + deserializeProtocolVersion(member.classicMemberMetadata().get()) + ).array())); Review Comment: nit: It may be worth having an helper method which prepare the desired assignment: `.setAssignment(prepareAssignment(member, metadataImage.topics())`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org