dajac commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1601000794


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+     *
+     * @param group                 The ConsumerGroup.
+     * @param member                The ConsumerGroupMember.
+     * @param requestGenerationId   The generation id from the request.
+     * @param requestProtocolType   The protocol type from the request.
+     * @param requestProtocolName   The protocol name from the request.
+     */
+    private void throwIfGenerationIdOrProtocolUnmatched(

Review Comment:
   nit: It may be better to split this one into two methods. One to validate 
the generation. Another one to validate the protocol type and name.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+     *
+     * @param group                 The ConsumerGroup.
+     * @param member                The ConsumerGroupMember.
+     * @param requestGenerationId   The generation id from the request.
+     * @param requestProtocolType   The protocol type from the request.
+     * @param requestProtocolName   The protocol name from the request.
+     */
+    private void throwIfGenerationIdOrProtocolUnmatched(
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        int requestGenerationId,
+        String requestProtocolType,
+        String requestProtocolName
+    ) {
+        if (member.memberEpoch() != requestGenerationId) {
+            throw Errors.ILLEGAL_GENERATION.exception(
+                String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",

Review Comment:
   nit: `group epoch` -> `member epoch`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+     *
+     * @param group                 The ConsumerGroup.
+     * @param member                The ConsumerGroupMember.
+     * @param requestGenerationId   The generation id from the request.
+     * @param requestProtocolType   The protocol type from the request.
+     * @param requestProtocolName   The protocol name from the request.
+     */
+    private void throwIfGenerationIdOrProtocolUnmatched(
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        int requestGenerationId,
+        String requestProtocolType,
+        String requestProtocolName
+    ) {
+        if (member.memberEpoch() != requestGenerationId) {
+            throw Errors.ILLEGAL_GENERATION.exception(
+                String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",
+                    requestGenerationId, group.groupEpoch(), group.groupId())

Review Comment:
   nit: `group.groupEpoch()` -> `member.memberEpoch()`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdOrProtocolUnmatched(
+            group,
+            member,
+            request.generationId(),
+            request.protocolType(),
+            request.protocolName()
+        );
+
+        cancelConsumerGroupSyncTimeout(groupId, memberId);
+//        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());
+
+        byte[] assignment = ConsumerProtocol.serializeAssignment(
+            new 
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
 metadataImage.topics())),
+            deserializeProtocolVersion(member.classicMemberMetadata().get())
+        ).array();
+
+        responseFuture.complete(new SyncGroupResponseData()
+            .setProtocolType(request.protocolType())
+            .setProtocolName(request.protocolName())
+            .setAssignment(assignment)
+            .setErrorCode(Errors.NONE.code()));

Review Comment:
   This is incorrect. We also need to send the response only when the "append 
future" is completed. The reason is that we potentially access uncommitted 
state here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdOrProtocolUnmatched(
+            group,
+            member,
+            request.generationId(),
+            request.protocolType(),
+            request.protocolName()
+        );
+

Review Comment:
   Hum... I wonder if we could rely on the member epoch to do this. If the 
member epoch is smaller than the group epoch and the member is not in unrevoked 
partitions state, we could return rebalance in progress. If the member epoch is 
smaller than the group epoch, it means that the member must rebalance to catch 
up. However, if the member is already in unrevoked partitions state, it means 
that it has already started a rebalance and it must complete it to revoke 
partitions. It will automatically start another one after revoking the 
partitions. Would something like this work?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {

Review Comment:
   nit: Do we ever throw `GroupIdNotFoundException`? It would be great if we 
could add the remaining ones too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1197,6 +1197,45 @@ private void throwIfClassicProtocolIsNotSupported(
         }
     }
 
+    /**
+     * Validates if the consumer group member uses the classic protocol.
+     *
+     * @param member The ConsumerGroupMember.
+     */
+    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember 
member) {
+        if (!member.useClassicProtocol()) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s does not use the classic protocol.", 
member.memberId())
+            );
+        }
+    }
+
+    /**
+     * Validates if the generation id and the protocol type from the request 
match those of the consumer group.
+     *
+     * @param group                 The ConsumerGroup.
+     * @param member                The ConsumerGroupMember.
+     * @param requestGenerationId   The generation id from the request.
+     * @param requestProtocolType   The protocol type from the request.
+     * @param requestProtocolName   The protocol name from the request.
+     */
+    private void throwIfGenerationIdOrProtocolUnmatched(
+        ConsumerGroup group,
+        ConsumerGroupMember member,
+        int requestGenerationId,
+        String requestProtocolType,
+        String requestProtocolName
+    ) {
+        if (member.memberEpoch() != requestGenerationId) {
+            throw Errors.ILLEGAL_GENERATION.exception(
+                String.format("The request generation id %s is not equal to 
the group epoch %d from the consumer group %s.",
+                    requestGenerationId, group.groupEpoch(), group.groupId())
+            );
+        } else if (!group.supportsClassicProtocols(requestProtocolType, 
Collections.singleton(requestProtocolName))) {
+            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("The member 
protocol is not supported.");
+        }

Review Comment:
   This is incorrect. In the JoinGroup response, we set the protocol name to 
`protocols.iterator().next().name()`. Here we should validate that we get it 
back.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdOrProtocolUnmatched(
+            group,
+            member,
+            request.generationId(),
+            request.protocolType(),
+            request.protocolName()
+        );
+
+        cancelConsumerGroupSyncTimeout(groupId, memberId);
+//        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());
+
+        byte[] assignment = ConsumerProtocol.serializeAssignment(
+            new 
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
 metadataImage.topics())),

Review Comment:
   Like in the join group api, I think that we could avoid the intermediate 
data structures here too.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdOrProtocolUnmatched(
+            group,
+            member,
+            request.generationId(),
+            request.protocolType(),
+            request.protocolName()
+        );
+
+        cancelConsumerGroupSyncTimeout(groupId, memberId);
+//        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());
+
+        byte[] assignment = ConsumerProtocol.serializeAssignment(
+            new 
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
 metadataImage.topics())),
+            deserializeProtocolVersion(member.classicMemberMetadata().get())
+        ).array();
+
+        responseFuture.complete(new SyncGroupResponseData()
+            .setProtocolType(request.protocolType())
+            .setProtocolName(request.protocolName())
+            .setAssignment(assignment)
+            .setErrorCode(Errors.NONE.code()));

Review Comment:
   nit: The error code is 0 by default so we don't need to set it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3797,32 +3874,51 @@ private CoordinatorResult<Void, Record> 
updateStaticMemberThenRebalanceOrComplet
      * @param request        The actual SyncGroup request.
      * @param responseFuture The sync group response future.
      *
-     * @return The result that contains records to append if the group 
metadata manager received assignments.
+     * @return The result that contains records to append.
      */
     public CoordinatorResult<Void, Record> classicGroupSync(
         RequestContext context,
         SyncGroupRequestData request,
         CompletableFuture<SyncGroupResponseData> responseFuture
     ) throws UnknownMemberIdException, GroupIdNotFoundException {
-        String groupId = request.groupId();
-        String memberId = request.memberId();
-        ClassicGroup group;
-        try {
-            group = getOrMaybeCreateClassicGroup(groupId, false);
-        } catch (Throwable t) {
+        Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+        if (group == null || group.isEmpty()) {
             responseFuture.complete(new SyncGroupResponseData()
-                .setErrorCode(Errors.forException(t).code())
-            );
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
             return EMPTY_RESULT;
         }
 
+        if (group.type() == CLASSIC) {
+            return classicGroupSyncToClassicGroup((ClassicGroup) group, 
context, request, responseFuture);
+        } else {
+            return classicGroupSyncToConsumerGroup((ConsumerGroup) group, 
context, request, responseFuture);
+        }
+    }
+
+    /**
+     * Handle a SyncGroupRequest to a ClassicGroup.
+     *
+     * @param group          The ClassicGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append if the group 
metadata manager received assignments.
+     */
+    public CoordinatorResult<Void, Record> classicGroupSyncToClassicGroup(

Review Comment:
   nit: Could it be private?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -11885,16 +11945,352 @@ public void 
testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th
         assertEquals(expectedMember3.state(), 
group.getOrMaybeCreateMember(memberId1, false).state());
 
         joinResult3.appendFuture.complete(null);
+        JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get();
         assertEquals(
             new JoinGroupResponseData()
                 .setMemberId(memberId1)
                 .setGenerationId(11)
                 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
                 .setProtocolName("range"),
-            joinResult3.joinFuture.get()
+            joinResponse3
         );
         context.assertSessionTimeout(groupId, memberId1, 
request3.sessionTimeoutMs());
         context.assertSyncTimeout(groupId, memberId1, 
request3.rebalanceTimeoutMs());
+
+        // Member 1 sends sync request to get the assigned partitions.
+        testClassicGroupSyncToConsumerGroup(
+            context,
+            groupId,
+            joinResponse3.memberId(),
+            joinResponse3.generationId(),
+            joinResponse3.protocolName(),
+            joinResponse3.protocolType(),
+            Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(zarTopicName, 0)
+            )
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws 
Exception {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        for (short version = 
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+            List<TopicPartition> topicPartitions = Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(fooTopicName, 2),
+                new TopicPartition(barTopicName, 0),
+                new TopicPartition(barTopicName, 1)
+            );
+
+            List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+                new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                    .setName("range")
+                    
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                        new ConsumerPartitionAssignor.Subscription(
+                            Arrays.asList(fooTopicName, barTopicName),
+                            null,
+                            topicPartitions
+                        ),
+                        version
+                    )))
+            );
+
+            ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setClassicMemberMetadata(
+                    new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+//                        .setClassicMemberSessionTimeout(5000)

Review Comment:
   Don't forget this one.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -11885,16 +11945,352 @@ public void 
testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th
         assertEquals(expectedMember3.state(), 
group.getOrMaybeCreateMember(memberId1, false).state());
 
         joinResult3.appendFuture.complete(null);
+        JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get();
         assertEquals(
             new JoinGroupResponseData()
                 .setMemberId(memberId1)
                 .setGenerationId(11)
                 .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
                 .setProtocolName("range"),
-            joinResult3.joinFuture.get()
+            joinResponse3
         );
         context.assertSessionTimeout(groupId, memberId1, 
request3.sessionTimeoutMs());
         context.assertSyncTimeout(groupId, memberId1, 
request3.rebalanceTimeoutMs());
+
+        // Member 1 sends sync request to get the assigned partitions.
+        testClassicGroupSyncToConsumerGroup(
+            context,
+            groupId,
+            joinResponse3.memberId(),
+            joinResponse3.generationId(),
+            joinResponse3.protocolName(),
+            joinResponse3.protocolType(),
+            Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(zarTopicName, 0)
+            )
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() throws 
Exception {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        for (short version = 
ConsumerProtocolAssignment.LOWEST_SUPPORTED_VERSION; version <= 
ConsumerProtocolAssignment.HIGHEST_SUPPORTED_VERSION; version++) {
+            List<TopicPartition> topicPartitions = Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(fooTopicName, 1),
+                new TopicPartition(fooTopicName, 2),
+                new TopicPartition(barTopicName, 0),
+                new TopicPartition(barTopicName, 1)
+            );
+
+            List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+                new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                    .setName("range")
+                    
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                        new ConsumerPartitionAssignor.Subscription(
+                            Arrays.asList(fooTopicName, barTopicName),
+                            null,
+                            topicPartitions
+                        ),
+                        version
+                    )))
+            );
+
+            ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setClassicMemberMetadata(
+                    new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+//                        .setClassicMemberSessionTimeout(5000)
+                        .setSupportedProtocols(protocols)
+                )
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .build();
+            ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(9)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .build();
+
+            // Consumer group with two members.
+            // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
+            GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+                
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+                .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+                .withMetadataImage(new MetadataImageBuilder()
+                    .addTopic(fooTopicId, fooTopicName, 6)
+                    .addTopic(barTopicId, barTopicName, 3)
+                    .addRacks()
+                    .build())
+                .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                    .withMember(member1)
+                    .withMember(member2)
+                    .withAssignment(memberId1, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .withAssignment(memberId2, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .withAssignmentEpoch(10))
+                .build();
+
+            testClassicGroupSyncToConsumerGroup(
+                context,
+                groupId,
+                memberId1,
+                10,
+                "range",
+                ConsumerProtocol.PROTOCOL_TYPE,
+                topicPartitions,
+                version
+            );
+        }
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        // Consumer group with a member that doesn't use the classic protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .build()))
+            .build();
+
+        // Request with unknown member id.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(Uuid.randomUuid().toString())
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+
+        // Request with unknown instance id.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGroupInstanceId("unknown-instance-id")
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+
+        // Request with member id that doesn't use the classic protocol.
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setInstanceId(instanceId)
+                    .build()))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(Uuid.randomUuid().toString())
+                .withGroupInstanceId(instanceId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void 
testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() throws 
Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+//                        .setClassicMemberSessionTimeout(5000)
+                            .setSupportedProtocols(protocols)
+                    )
+                    .setMemberEpoch(10)
+                    .build()))
+            .build();
+
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("roundrobin")
+                .build())
+        );
+
+        assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(10)
+                .withProtocolName("connect")
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    @Test
+    public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() 
throws Exception {
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+                    new ConsumerPartitionAssignor.Subscription(
+                        Arrays.asList("foo"),
+                        null,
+                        Collections.emptyList()
+                    )
+                )))
+        );
+
+        // Consumer group with a static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setClassicMemberMetadata(
+                        new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+//                        .setClassicMemberSessionTimeout(5000)
+                            .setSupportedProtocols(protocols)
+                    )
+                    .setMemberEpoch(10)
+                    .build()))
+            .build();
+
+        assertThrows(IllegalGenerationException.class, () -> 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(memberId)
+                .withGenerationId(9)
+                .withProtocolName(ConsumerProtocol.PROTOCOL_TYPE)
+                .withProtocolType("range")
+                .build())
+        );
+    }
+
+    private void testClassicGroupSyncToConsumerGroup(
+        GroupMetadataManagerTestContext context,

Review Comment:
   Should we move this method and the next one to the 
`GroupMetadataManagerTestContext`? Having the context as the first argument 
usually signals that the methods should be in the context itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to