dongnuo123 commented on code in PR #21319:
URL: https://github.com/apache/kafka/pull/21319#discussion_r2699344382
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -694,6 +694,388 @@ public void
testMemberCanRejoinWithEpochZeroInUnreleasedPartitionsState() {
);
}
+ @Test
+ public void testDuplicateFullHeartbeatInStableState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in STABLE state with epoch 100.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request with current epoch.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))));
+
+ // First heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result1.response()
+ );
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatInUnrevokedPartitionsState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // Target assignment is [0, 1], but member still owns [0, 1, 2].
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request with current epoch. Member still reports owning
all partitions.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))));
+
+ // First heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatInUnreleasedPartitionsState() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member 1 is in UNRELEASED_PARTITIONS state with epoch 100.
+ // Member 1 has [0] assigned but target is [0, 1, 2].
+ // Member 2 still owns [1, 2] and needs to revoke them.
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.UNRELEASED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build();
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(99)
+ .setPreviousMemberEpoch(98)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment())
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 1,
2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member1));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member2));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment()));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member1));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member2));
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+
+ // Create full request with current epoch.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0))));
+
+ // First heartbeat. Member is UNRELEASED_PARTITIONS so response
includes current assignment.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatWithRevocationAck() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // Target assignment is [0, 1], member needs to revoke [2].
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request acknowledging revocation (only owns [0, 1]).
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))));
+
+ // First heartbeat acknowledges revocation and transitions to STABLE.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(101)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Duplicate heartbeat with same request but epoch is now stale.
Review Comment:
nit: The comment is a bit misleading. Would it be better to say `Duplicate
heartbeat with same request but with the new epoch.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]