dajac commented on code in PR #21319:
URL: https://github.com/apache/kafka/pull/21319#discussion_r2699912941


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -694,6 +694,388 @@ public void 
testMemberCanRejoinWithEpochZeroInUnreleasedPartitionsState() {
         );
     }
 
+    @Test
+    public void testDuplicateFullHeartbeatInStableState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in STABLE state with epoch 100.
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Create full request with current epoch.
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0, 1, 2))));
+
+        // First heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2))))),
+            result1.response()
+        );
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+    }
+
+    @Test
+    public void testDuplicateFullHeartbeatInUnrevokedPartitionsState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+        // Target assignment is [0, 1], but member still owns [0, 1, 2].
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 101, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Create full request with current epoch. Member still reports owning 
all partitions.
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0, 1, 2))));
+
+        // First heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1))))),
+            result1.response()
+        );
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+    }
+
+    @Test
+    public void testDuplicateFullHeartbeatInUnreleasedPartitionsState() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member 1 is in UNRELEASED_PARTITIONS state with epoch 100.
+        // Member 1 has [0] assigned but target is [0, 1, 2].
+        // Member 2 still owns [1, 2] and needs to revoke them.
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.UNRELEASED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build();
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(99)
+            .setPreviousMemberEpoch(98)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment())
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 1, 
2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member1));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member2));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId1, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId2, mkAssignment()));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member1));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member2));
+
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId1));
+
+        // Create full request with current epoch.
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0))));
+
+        // First heartbeat. Member is UNRELEASED_PARTITIONS so response 
includes current assignment.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0))))),
+            result1.response()
+        );
+
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId1));
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId1));
+    }
+
+    @Test
+    public void testDuplicateFullHeartbeatWithRevocationAck() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+        // Target assignment is [0, 1], member needs to revoke [2].
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 101, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Create full request acknowledging revocation (only owns [0, 1]).
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0, 1))));
+
+        // First heartbeat acknowledges revocation and transitions to STABLE.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(101)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1))))),
+            result1.response()
+        );
+
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Duplicate heartbeat with same request but epoch is now stale.

Review Comment:
   fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to