dajac commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param member The ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ + private void throwIfGenerationIdOrProtocolUnmatched( Review Comment: nit: It may be better to split this one into two methods. One to validate the generation. Another one to validate the protocol type and name. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param member The ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ + private void throwIfGenerationIdOrProtocolUnmatched( + ConsumerGroup group, + ConsumerGroupMember member, + int requestGenerationId, + String requestProtocolType, + String requestProtocolName + ) { + if (member.memberEpoch() != requestGenerationId) { + throw Errors.ILLEGAL_GENERATION.exception( + String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", Review Comment: nit: `group epoch` -> `member epoch`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param member The ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ + private void throwIfGenerationIdOrProtocolUnmatched( + ConsumerGroup group, + ConsumerGroupMember member, + int requestGenerationId, + String requestProtocolType, + String requestProtocolName + ) { + if (member.memberEpoch() != requestGenerationId) { + throw Errors.ILLEGAL_GENERATION.exception( + String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", + requestGenerationId, group.groupEpoch(), group.groupId()) Review Comment: nit: `group.groupEpoch()` -> `member.memberEpoch()`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdOrProtocolUnmatched( + group, + member, + request.generationId(), + request.protocolType(), + request.protocolName() + ); + + cancelConsumerGroupSyncTimeout(groupId, memberId); +// scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); + + byte[] assignment = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(), metadataImage.topics())), + deserializeProtocolVersion(member.classicMemberMetadata().get()) + ).array(); + + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(request.protocolType()) + .setProtocolName(request.protocolName()) + .setAssignment(assignment) + .setErrorCode(Errors.NONE.code())); Review Comment: This is incorrect. We also need to send the response only when the "append future" is completed. The reason is that we potentially access uncommitted state here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdOrProtocolUnmatched( + group, + member, + request.generationId(), + request.protocolType(), + request.protocolName() + ); + Review Comment: Hum... I wonder if we could rely on the member epoch to do this. If the member epoch is smaller than the group epoch and the member is not in unrevoked partitions state, we could return rebalance in progress. If the member epoch is smaller than the group epoch, it means that the member must rebalance to catch up. However, if the member is already in unrevoked partitions state, it means that it has already started a rebalance and it must complete it to revoke partitions. It will automatically start another one after revoking the partitions. Would something like this work? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { Review Comment: nit: Do we ever throw `GroupIdNotFoundException`? It would be great if we could add the remaining ones too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported( } } + /** + * Validates if the consumer group member uses the classic protocol. + * + * @param member The ConsumerGroupMember. + */ + private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) { + if (!member.useClassicProtocol()) { + throw new UnknownMemberIdException( + String.format("Member %s does not use the classic protocol.", member.memberId()) + ); + } + } + + /** + * Validates if the generation id and the protocol type from the request match those of the consumer group. + * + * @param group The ConsumerGroup. + * @param member The ConsumerGroupMember. + * @param requestGenerationId The generation id from the request. + * @param requestProtocolType The protocol type from the request. + * @param requestProtocolName The protocol name from the request. + */ + private void throwIfGenerationIdOrProtocolUnmatched( + ConsumerGroup group, + ConsumerGroupMember member, + int requestGenerationId, + String requestProtocolType, + String requestProtocolName + ) { + if (member.memberEpoch() != requestGenerationId) { + throw Errors.ILLEGAL_GENERATION.exception( + String.format("The request generation id %s is not equal to the group epoch %d from the consumer group %s.", + requestGenerationId, group.groupEpoch(), group.groupId()) + ); + } else if (!group.supportsClassicProtocols(requestProtocolType, Collections.singleton(requestProtocolName))) { + throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("The member protocol is not supported."); + } Review Comment: This is incorrect. In the JoinGroup response, we set the protocol name to `protocols.iterator().next().name()`. Here we should validate that we get it back. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdOrProtocolUnmatched( + group, + member, + request.generationId(), + request.protocolType(), + request.protocolName() + ); + + cancelConsumerGroupSyncTimeout(groupId, memberId); +// scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); + + byte[] assignment = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(), metadataImage.topics())), Review Comment: Like in the join group api, I think that we could avoid the intermediate data structures here too. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdOrProtocolUnmatched( + group, + member, + request.generationId(), + request.protocolType(), + request.protocolName() + ); + + cancelConsumerGroupSyncTimeout(groupId, memberId); +// scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); + + byte[] assignment = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(), metadataImage.topics())), + deserializeProtocolVersion(member.classicMemberMetadata().get()) + ).array(); + + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(request.protocolType()) + .setProtocolName(request.protocolName()) + .setAssignment(assignment) + .setErrorCode(Errors.NONE.code())); Review Comment: nit: The error code is 0 by default so we don't need to set it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3797,32 +3874,51 @@ private CoordinatorResult<Void, Record> updateStaticMemberThenRebalanceOrComplet * @param request The actual SyncGroup request. * @param responseFuture The sync group response future. * - * @return The result that contains records to append if the group metadata manager received assignments. + * @return The result that contains records to append. */ public CoordinatorResult<Void, Record> classicGroupSync( RequestContext context, SyncGroupRequestData request, CompletableFuture<SyncGroupResponseData> responseFuture ) throws UnknownMemberIdException, GroupIdNotFoundException { - String groupId = request.groupId(); - String memberId = request.memberId(); - ClassicGroup group; - try { - group = getOrMaybeCreateClassicGroup(groupId, false); - } catch (Throwable t) { + Group group = groups.get(request.groupId(), Long.MAX_VALUE); + + if (group == null || group.isEmpty()) { responseFuture.complete(new SyncGroupResponseData() - .setErrorCode(Errors.forException(t).code()) - ); + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); return EMPTY_RESULT; } + if (group.type() == CLASSIC) { + return classicGroupSyncToClassicGroup((ClassicGroup) group, context, request, responseFuture); + } else { + return classicGroupSyncToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); + } + } + + /** + * Handle a SyncGroupRequest to a ClassicGroup. + * + * @param group The ClassicGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append if the group metadata manager received assignments. + */ + public CoordinatorResult<Void, Record> classicGroupSyncToClassicGroup( Review Comment: nit: Could it be private? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -11885,16 +11945,352 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult3.appendFuture.complete(null); + JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get(); assertEquals( new JoinGroupResponseData() .setMemberId(memberId1) .setGenerationId(11) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setProtocolName("range"), - joinResult3.joinFuture.get() + joinResponse3 ); context.assertSessionTimeout(groupId, memberId1, request3.sessionTimeoutMs()); context.assertSyncTimeout(groupId, memberId1, request3.rebalanceTimeoutMs()); + + // Member 1 sends sync request to get the assigned partitions. + testClassicGroupSyncToConsumerGroup( + context, + groupId, + joinResponse3.memberId(), + joinResponse3.generationId(), + joinResponse3.protocolName(), + joinResponse3.protocolType(), + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(zarTopicName, 0) + ) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) { + List<TopicPartition> topicPartitions = Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + topicPartitions + ), + version + ))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +// .setClassicMemberSessionTimeout(5000) Review Comment: Don't forget this one. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -11885,16 +11945,352 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult3.appendFuture.complete(null); + JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get(); assertEquals( new JoinGroupResponseData() .setMemberId(memberId1) .setGenerationId(11) .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setProtocolName("range"), - joinResult3.joinFuture.get() + joinResponse3 ); context.assertSessionTimeout(groupId, memberId1, request3.sessionTimeoutMs()); context.assertSyncTimeout(groupId, memberId1, request3.rebalanceTimeoutMs()); + + // Member 1 sends sync request to get the assigned partitions. + testClassicGroupSyncToConsumerGroup( + context, + groupId, + joinResponse3.memberId(), + joinResponse3.generationId(), + joinResponse3.protocolName(), + joinResponse3.protocolType(), + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(zarTopicName, 0) + ) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + for (short version = ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) { + List<TopicPartition> topicPartitions = Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + topicPartitions + ), + version + ))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +// .setClassicMemberSessionTimeout(5000) + .setSupportedProtocols(protocols) + ) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + testClassicGroupSyncToConsumerGroup( + context, + groupId, + memberId1, + 10, + "range", + ConsumerProtocol.PROTOCOL_TYPE, + topicPartitions, + version + ); + } + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + // Consumer group with a member that doesn't use the classic protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .build())) + .build(); + + // Request with unknown member id. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + + // Request with unknown instance id. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGroupInstanceId("unknown-instance-id") + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + + // Request with member id that doesn't use the classic protocol. + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .build())) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(Uuid.randomUuid().toString()) + .withGroupInstanceId(instanceId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +// .setClassicMemberSessionTimeout(5000) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("roundrobin") + .build()) + ); + + assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(10) + .withProtocolName("connect") + .withProtocolType("range") + .build()) + ); + } + + @Test + public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Arrays.asList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +// .setClassicMemberSessionTimeout(5000) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + assertThrows(IllegalGenerationException.class, () -> context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(memberId) + .withGenerationId(9) + .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE) + .withProtocolType("range") + .build()) + ); + } + + private void testClassicGroupSyncToConsumerGroup( + GroupMetadataManagerTestContext context, Review Comment: Should we move this method and the next one to the `GroupMetadataManagerTestContext`? Having the context as the first argument usually signals that the methods should be in the context itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org