dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1602768290


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id from the request matches the member 
epoch.
+     *
+     * @param groupId               The ConsumerGroup id.
+     * @param memberEpoch           The member epoch.
+     * @param requestGenerationId   The generation id from the request.
+     */
+    private void throwIfGenerationIdUnmatched(
+        String groupId,

Review Comment:
   nit: I think that we could remove the group id here as the caller knows its 
context.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id from the request matches the member 
epoch.
+     *
+     * @param groupId               The ConsumerGroup id.
+     * @param memberEpoch           The member epoch.
+     * @param requestGenerationId   The generation id from the request.
+     */
+    private void throwIfGenerationIdUnmatched(
+        String groupId,
+        int memberEpoch,
+        int requestGenerationId
+    ) {
+        if (memberEpoch != requestGenerationId) {
+            throw Errors.ILLEGAL_GENERATION.exception(
+                String.format("The request generation id %s is not equal to 
the member epoch %d from the consumer group %s.",
+                    requestGenerationId, memberEpoch, groupId)
+            );
+        }
+    }
+
+    /**
+     * Validates if the protocol type and the protocol name from the request 
matches those of the consumer group.
+     *
+     * @param member                The ConsumerGroupMember.
+     * @param requestProtocolType   The protocol type from the request.
+     * @param requestProtocolName   The protocol name from the request.
+     */
+    private void throwIfClassicProtocolUnmatched(
+        ConsumerGroupMember member,
+        String requestProtocolType,
+        String requestProtocolName
+    ) {
+        if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(
+                String.format("The protocol type %s from member %s request is 
not equal to the group protocol type %s.",
+                    requestProtocolType, member.memberId(), 
ConsumerProtocol.PROTOCOL_TYPE)
+            );
+        } else if 
(!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName))
 {

Review Comment:
   nit: Should we extract 
`member.supportedClassicProtocols().get().iterator().next().name()` into a 
variable? I also assume that it should always be set at this point. Is it the 
case?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3931,6 +4066,65 @@ public CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException,
+        InconsistentGroupProtocolException, RebalanceInProgressException, 
IllegalStateException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(groupId, member.memberEpoch(), 
request.generationId());
+        throwIfClassicProtocolUnmatched(member, request.protocolType(), 
request.protocolName());
+

Review Comment:
   nit: Should we remove this empty line?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -11996,16 +12055,295 @@ public void 
testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th
         assertEquals(expectedMember3.state(), 
group.getOrMaybeCreateMember(memberId1, false).state());
 
         joinResult3.appendFuture.complete(null);
+        JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get();
         assertEquals(
             new JoinGroupResponseData()
                 .setMemberId(memberId1)
                 .setGenerationId(11)
                 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
                 .setProtocolName("range"),
-            joinResult3.joinFuture.get()
+            joinResponse3
         );
         context.assertSessionTimeout(groupId, memberId1, 
request3.sessionTimeoutMs());
         context.assertSyncTimeout(groupId, memberId1, 
request3.rebalanceTimeoutMs());
+
+        // Member 1 sends sync request to get the assigned partitions.
+        context.verifyClassicGroupSyncToConsumerGroup(
+            groupId,
+            joinResponse3.memberId(),
+            joinResponse3.generationId(),
+            joinResponse3.protocolName(),
+            joinResponse3.protocolType(),
+            Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(zarTopicName, 0)
+            )
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws 
Exception {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        for (short version = 
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+            List<TopicPartition> topicPartitions = Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(fooTopicName, 2),
+                new TopicPartition(barTopicName, 0),
+                new TopicPartition(barTopicName, 1)
+            );
+
+            List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+                new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                    .setName("range")
+                    
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                        new ConsumerPartitionAssignor.Subscription(
+                            Arrays.asList(fooTopicName, barTopicName),
+                            null,
+                            topicPartitions
+                        ),
+                        version
+                    )))
+            );
+
+            ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setClassicMemberMetadata(
+                    new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                        .setSessionTimeoutMs(5000)
+                        .setSupportedProtocols(protocols)
+                )
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .build();
+            ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .build();
+
+            // Consumer group with two members.
+            // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
+            GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+                
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+                .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+                .withMetadataImage(new MetadataImageBuilder()
+                    .addTopic(fooTopicId, fooTopicName, 6)
+                    .addTopic(barTopicId, barTopicName, 3)
+                    .addRacks()
+                    .build())
+                .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                    .withMember(member1)
+                    .withMember(member2)
+                    .withAssignment(memberId1, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .withAssignment(memberId2, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .withAssignmentEpoch(10))
+                .build();
+
+            context.verifyClassicGroupSyncToConsumerGroup(
+                groupId,
+                memberId1,
+                10,
+                "range",
+                ConsumerProtocol.PROTOCOL_TYPE,
+                topicPartitions,
+                version
+            );
+        }
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        // Consumer group with a member that doesn't use the classic protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .build()))
+            .build();
+
+        // Request with unknown member id.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(Uuid.randomUuid().toString())
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+
+        // Request with unknown instance id.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGroupInstanceId("unknown-instance-id")
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+
+        // Request with member id that doesn't use the classic protocol.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setInstanceId(instanceId)
+                    .build()))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(Uuid.randomUuid().toString())
+                .withGroupInstanceId(instanceId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws 
Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(5000)
+                            .setSupportedProtocols(protocols)
+                    )
+                    .setMemberEpoch(10)
+                    .build()))
+            .build();
+
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("roundrobin")
+                .build())
+        );
+
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName("connect")
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                            .setSessionTimeoutMs(5000)
+                            .setSupportedProtocols(protocols)
+                    )
+                    .setMemberEpoch(10)
+                    .build()))
+            .build();
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(9)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );

Review Comment:
   Should we add a test to validate the rebalance in progress error?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -11996,16 +12055,295 @@ public void 
testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th
         assertEquals(expectedMember3.state(), 
group.getOrMaybeCreateMember(memberId1, false).state());
 
         joinResult3.appendFuture.complete(null);
+        JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get();
         assertEquals(
             new JoinGroupResponseData()
                 .setMemberId(memberId1)
                 .setGenerationId(11)
                 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
                 .setProtocolName("range"),
-            joinResult3.joinFuture.get()
+            joinResponse3
         );
         context.assertSessionTimeout(groupId, memberId1, 
request3.sessionTimeoutMs());
         context.assertSyncTimeout(groupId, memberId1, 
request3.rebalanceTimeoutMs());
+
+        // Member 1 sends sync request to get the assigned partitions.
+        context.verifyClassicGroupSyncToConsumerGroup(
+            groupId,
+            joinResponse3.memberId(),
+            joinResponse3.generationId(),
+            joinResponse3.protocolName(),
+            joinResponse3.protocolType(),
+            Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(zarTopicName, 0)
+            )
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws 
Exception {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        for (short version = 
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+            List<TopicPartition> topicPartitions = Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(fooTopicName, 2),
+                new TopicPartition(barTopicName, 0),
+                new TopicPartition(barTopicName, 1)
+            );
+
+            List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+                new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                    .setName("range")
+                    
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                        new ConsumerPartitionAssignor.Subscription(
+                            Arrays.asList(fooTopicName, barTopicName),
+                            null,
+                            topicPartitions
+                        ),
+                        version
+                    )))
+            );
+
+            ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setClassicMemberMetadata(
+                    new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                        .setSessionTimeoutMs(5000)
+                        .setSupportedProtocols(protocols)
+                )
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .build();
+            ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .build();
+
+            // Consumer group with two members.
+            // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
+            GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+                
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+                .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+                .withMetadataImage(new MetadataImageBuilder()
+                    .addTopic(fooTopicId, fooTopicName, 6)
+                    .addTopic(barTopicId, barTopicName, 3)
+                    .addRacks()
+                    .build())
+                .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                    .withMember(member1)
+                    .withMember(member2)
+                    .withAssignment(memberId1, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .withAssignment(memberId2, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .withAssignmentEpoch(10))
+                .build();
+
+            context.verifyClassicGroupSyncToConsumerGroup(
+                groupId,
+                memberId1,
+                10,
+                "range",
+                ConsumerProtocol.PROTOCOL_TYPE,
+                topicPartitions,
+                version
+            );
+        }
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        // Consumer group with a member that doesn't use the classic protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .build()))
+            .build();
+
+        // Request with unknown member id.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(Uuid.randomUuid().toString())
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+
+        // Request with unknown instance id.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGroupInstanceId("unknown-instance-id")
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+
+        // Request with member id that doesn't use the classic protocol.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setInstanceId(instanceId)
+                    .build()))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(Uuid.randomUuid().toString())
+                .withGroupInstanceId(instanceId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws 
Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Consumer group with a static member.

Review Comment:
   static?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1199,82 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id from the request matches the member 
epoch.
+     *
+     * @param groupId               The ConsumerGroup id.
+     * @param memberEpoch           The member epoch.
+     * @param requestGenerationId   The generation id from the request.
+     */
+    private void throwIfGenerationIdUnmatched(
+        String groupId,
+        int memberEpoch,
+        int requestGenerationId
+    ) {
+        if (memberEpoch != requestGenerationId) {
+            throw Errors.ILLEGAL_GENERATION.exception(
+                String.format("The request generation id %s is not equal to 
the member epoch %d from the consumer group %s.",
+                    requestGenerationId, memberEpoch, groupId)
+            );
+        }
+    }
+
+    /**
+     * Validates if the protocol type and the protocol name from the request 
matches those of the consumer group.
+     *
+     * @param member                The ConsumerGroupMember.
+     * @param requestProtocolType   The protocol type from the request.
+     * @param requestProtocolName   The protocol name from the request.
+     */
+    private void throwIfClassicProtocolUnmatched(
+        ConsumerGroupMember member,
+        String requestProtocolType,
+        String requestProtocolName
+    ) {
+        if (!ConsumerProtocol.PROTOCOL_TYPE.equals(requestProtocolType)) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(
+                String.format("The protocol type %s from member %s request is 
not equal to the group protocol type %s.",
+                    requestProtocolType, member.memberId(), 
ConsumerProtocol.PROTOCOL_TYPE)
+            );
+        } else if 
(!member.supportedClassicProtocols().get().iterator().next().name().equals(requestProtocolName))
 {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(
+                String.format("The protocol name %s from member %s request is 
not equal to the protocol name %s returned in the join response.",
+                    requestProtocolName, member.memberId(), 
member.supportedClassicProtocols().get().iterator().next().name())
+            );
+        }
+    }
+
+    /**
+     * Validates if a new rebalance has been triggered and the member should 
rejoin to catch up.
+     *
+     * @param group     The ConsumerGroup.
+     * @param member    The ConsumerGroupMember.
+     */
+    private void throwIfRebalanceInProgress(
+        ConsumerGroup group,
+        ConsumerGroupMember member
+    ) {
+        if (group.groupEpoch() > member.memberEpoch() && 
!member.state().equals(MemberState.UNREVOKED_PARTITIONS)) {

Review Comment:
   nit: This logic deserves a comment to explain the reasoning. What do you 
think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3931,6 +4066,65 @@ public CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.

Review Comment:
   nit: Do we generate any records? If not, we should update this comment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3931,6 +4066,65 @@ public CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException,
+        InconsistentGroupProtocolException, RebalanceInProgressException, 
IllegalStateException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdUnmatched(groupId, member.memberEpoch(), 
request.generationId());
+        throwIfClassicProtocolUnmatched(member, request.protocolType(), 
request.protocolName());
+
+        throwIfRebalanceInProgress(group, member);
+
+        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+        appendFuture.whenComplete((__, t) -> {
+            if (t == null) {
+                cancelConsumerGroupSyncTimeout(groupId, memberId);
+                scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+                responseFuture.complete(new SyncGroupResponseData()
+                    .setProtocolType(request.protocolType())
+                    .setProtocolName(request.protocolName())
+                    .setAssignment(ConsumerProtocol.serializeAssignment(
+                        new 
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
 metadataImage.topics())),
+                        
deserializeProtocolVersion(member.classicMemberMetadata().get())
+                    ).array()));

Review Comment:
   nit: It may be worth having an helper method which prepare the desired 
assignment: `.setAssignment(prepareAssignment(member, metadataImage.topics())`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to