lianetm commented on code in PR #21319:
URL: https://github.com/apache/kafka/pull/21319#discussion_r2699579624
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -694,6 +694,388 @@ public void
testMemberCanRejoinWithEpochZeroInUnreleasedPartitionsState() {
);
}
+ @Test
+ public void testDuplicateFullHeartbeatInStableState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in STABLE state with epoch 100.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request with current epoch.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))));
+
+ // First heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result1.response()
+ );
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatInUnrevokedPartitionsState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // Target assignment is [0, 1], but member still owns [0, 1, 2].
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request with current epoch. Member still reports owning
all partitions.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))));
+
+ // First heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatInUnreleasedPartitionsState() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member 1 is in UNRELEASED_PARTITIONS state with epoch 100.
+ // Member 1 has [0] assigned but target is [0, 1, 2].
+ // Member 2 still owns [1, 2] and needs to revoke them.
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.UNRELEASED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build();
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(99)
+ .setPreviousMemberEpoch(98)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment())
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 1,
2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member1));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member2));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment()));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member1));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member2));
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+
+ // Create full request with current epoch.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0))));
+
+ // First heartbeat. Member is UNRELEASED_PARTITIONS so response
includes current assignment.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatWithRevocationAck() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // Target assignment is [0, 1], member needs to revoke [2].
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request acknowledging revocation (only owns [0, 1]).
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))));
+
+ // First heartbeat acknowledges revocation and transitions to STABLE.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(101)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Duplicate heartbeat with same request but epoch is now stale.
Review Comment:
agree is confusing, but actually if the intention is to
`testDuplicateFullHeartbeatWithRevocationAck` I guess the comment is right but
the epoch 101 is wrong (should be 100).
Shouldn't we just remove the `duplicateRequest` and send the same
`fullRequest` from above?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]