This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ef2941154e2 KAFKA-18931: added a share group session timeout task when
group coordinator is loaded (#19173)
ef2941154e2 is described below
commit ef2941154e2904c48295855d529cff34457e4fe1
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Mar 10 21:13:30 2025 +0530
KAFKA-18931: added a share group session timeout task when group
coordinator is loaded (#19173)
This PR adds `scheduleShareGroupSessionTimeout` for all the persisted
members of a share group when the group coordinator is loaded.
Reviewers: Andrew Schofield <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 7 +-
.../group/GroupMetadataManagerTest.java | 227 +++++++++++++++++++++
2 files changed, 233 insertions(+), 1 deletion(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4985c013760..fc954754de1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -4885,7 +4885,12 @@ public class GroupMetadataManager {
break;
case SHARE:
- // Nothing for now for the ShareGroup, as no members are
persisted.
+ ShareGroup shareGroup = (ShareGroup) group;
+ log.info("Loaded share group {} with {} members.",
groupId, shareGroup.members().size());
+ shareGroup.members().forEach((memberId, member) -> {
+ log.debug("Loaded member {} in share group {}.",
memberId, groupId);
+ scheduleShareGroupSessionTimeout(groupId, memberId);
+ });
break;
default:
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f83bd1c267b..6df8e237933 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -3204,6 +3204,188 @@ public class GroupMetadataManagerTest {
context.assertNoRebalanceTimeout(groupId, memberId);
}
+ @Test
+ public void testOnLoadedSessionTimeoutExpiration() {
+ String groupId = "group";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ String memberId = "foo-1";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder("foo-1")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(9)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Let's assume that all the records have been replayed and now
+ // onLoaded is called to signal it.
+ context.groupMetadataManager.onLoaded();
+
+ // All members should have a session timeout in place.
+ assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId,
memberId)));
+
+ // Advance time past the session timeout.
+ List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+
+ // Verify the expired timeout.
+ assertEquals(
+ List.of(
+ new ExpiredTimeout<Void, CoordinatorRecord>(
+ groupSessionTimeoutKey(groupId, memberId),
+ new CoordinatorResult<>(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
+ )
+ )
+ )
+ ),
+ timeouts
+ );
+
+ // Verify that there are no timers.
+ context.assertNoSessionTimeout(groupId, memberId);
+ }
+
+ @Test
+ public void testSessionTimeoutExpirationForShareMember() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addRacks()
+ .build())
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Session timer is scheduled on first heartbeat.
+ CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord>
result =
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of("foo")));
+ assertEquals(1, result.response().memberEpoch());
+
+ // Verify that there is a session time.
+ context.assertSessionTimeout(groupId, memberId, 45000);
+
+ // Advance time past the session timeout.
+ List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+
+ // Verify the expired timeout.
+ assertEquals(
+ List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+ groupSessionTimeoutKey(groupId, memberId),
+ new CoordinatorResult<>(
+ List.of(
+
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId,
Map.of()),
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2)
+ )
+ )
+ )),
+ timeouts
+ );
+
+ // Verify that there are no timers.
+ context.assertNoSessionTimeout(groupId, memberId);
+ }
+
+ @Test
+ public void testOnLoadedSessionTimeoutExpirationForShareMember() {
+ String groupId = "group";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ String memberId = "foo-1";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withShareGroup(new ShareGroupBuilder(groupId, 10)
+ .withMember(new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(9)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Let's assume that all the records have been replayed and now
+ // onLoaded is called to signal it.
+ context.groupMetadataManager.onLoaded();
+
+ // All members should have a session timeout in place.
+ assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId,
memberId)));
+
+ // Advance time past the session timeout.
+ List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts =
context.sleep(45000 + 1);
+
+ // Verify the expired timeout.
+ assertEquals(
+ List.of(
+ new ExpiredTimeout<Void, CoordinatorRecord>(
+ groupSessionTimeoutKey(groupId, memberId),
+ new CoordinatorResult<>(
+ List.of(
+
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
+
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId,
Map.of()),
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
+ )
+ )
+ )
+ ),
+ timeouts
+ );
+
+ // Verify that there are no timers.
+ context.assertNoSessionTimeout(groupId, memberId);
+ }
+
@Test
public void testSessionTimeoutExpirationStaticMember() {
String groupId = "fooup";
@@ -3679,6 +3861,51 @@ public class GroupMetadataManagerTest {
assertNotNull(context.timer.timeout(groupRebalanceTimeoutKey("foo",
"foo-1")));
}
+ @Test
+ public void testOnLoadedWithShareGroup() {
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 3)
+ .build())
+ .withShareGroup(new ShareGroupBuilder("foo", 10)
+ .withMember(new ShareGroupMember.Builder("foo-1")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(9)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withMember(new ShareGroupMember.Builder("foo-2")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .build())
+ .withAssignment("foo-1", mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Let's assume that all the records have been replayed and now
+ // onLoaded is called to signal it.
+ context.groupMetadataManager.onLoaded();
+
+ // All members should have a session timeout in place.
+ assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo",
"foo-1")));
+ assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo",
"foo-2")));
+ }
+
@Test
public void testUpdateGroupSizeCounter() {
List<String> groupIds = new ArrayList<>();