dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1574897141


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10331,6 +10333,461 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
             .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
     }
 
+    @Test
+    public void testLastClassicProtocolMemberLeavingConsumerGroup() {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Arrays.asList(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2),
+                        new TopicPartition(barTopicName, 0),
+                        new TopicPartition(barTopicName, 1)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                .setSupportedProtocols(protocols))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            // Use zar only here to ensure that metadata needs to be 
recomputed.
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        context.commit();
+        ConsumerGroup consumerGroup = 
context.groupMetadataManager.consumerGroup(groupId);
+
+        // Member 2 leaves the consumer group, triggering the downgrade.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+
+        byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 0), new 
TopicPartition(fooTopicName, 1), new TopicPartition(fooTopicName, 2),
+            new TopicPartition(barTopicName, 0), new 
TopicPartition(barTopicName, 1)
+        ))));
+        Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
+            {
+                put(memberId1, assignment);
+            }
+        };
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            new LogContext(),
+            groupId,
+            STABLE,
+            context.time,
+            context.metrics,
+            10,
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.ofNullable("range"),
+            Optional.ofNullable(memberId1),
+            Optional.of(context.time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.ofNullable(member1.instanceId()),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                45000,
+                ConsumerProtocol.PROTOCOL_TYPE,
+                member1.supportedJoinGroupRequestProtocols(),
+                assignment
+            )
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            // Subscription metadata is recomputed because zar is no longer 
there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),

Review Comment:
   Should we also add tombstones for all the members when deleting a group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to