jolshan commented on code in PR #13963: URL: https://github.com/apache/kafka/pull/13963#discussion_r1259983786
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() { assertEquals(image, context.groupMetadataManager.image()); } + @Test + public void testSessionTimeoutLifecycle() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(1, result.response().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time. + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Session timer is rescheduled on second heartbeat. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().memberEpoch())); + assertEquals(1, result.response().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time. + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Session timer is cancelled on leave. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(-1)); + assertEquals(-1, result.response().memberEpoch()); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId); + context.assertNoRevocationTimeout(groupId, memberId); + } + + @Test + public void testSessionTimeoutExpiration() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(1, result.response().memberEpoch()); + + // Verify that there is a session time. + context.assertSessionTimeout(groupId, memberId, 45000); + + // Advance time past the session timeout. + List<MockCoordinatorTimer.ExpiredTimeout<Record>> timeouts = context.sleep(45000 + 1); + + // Verify the expired timeout. + assertEquals( + Collections.singletonList(new MockCoordinatorTimer.ExpiredTimeout<Record>( + consumerGroupSessionTimeoutKey(groupId, memberId), + Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap()), + RecordHelpers.newGroupEpochRecord(groupId, 2) + ) + )), + timeouts + ); + + // Verify that there are no timers. + context.assertNoSessionTimeout(groupId, memberId); + context.assertNoRevocationTimeout(groupId, memberId); + } + + @Test + public void testRevocationTimeoutLifecycle() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .build()) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + } + } + )); + + // Member 1 joins the group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1, 2))))), + result.response() + ); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Prepare next assignment. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2) + ))); + } + } + )); + + // Member 2 joins the group. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setPendingTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(2))))), + result.response() + ); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Member 1 heartbeats and transitions to revoking. The revocation timeout + // is scheduled. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo"))); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1))))), + result.response() + ); + + // Verify that there is a revocation timeout. + context.assertRevocationTimeout(groupId, memberId1, 90000); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Prepare next assignment. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2) + ))); + put(memberId3, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ))); + } + } + )); + + // Member 3 joins the group. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId3) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId3) + .setMemberEpoch(3) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setPendingTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(1))))), + result.response() + ); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Member 1 heartbeats and re-transitions to revoking. The revocation timeout + // is re-scheduled. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo"))); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0))))), + result.response() + ); + + // Verify that there is a revocation timeout. Keep a reference + // to the timeout for later. + MockCoordinatorTimer.ScheduledTimeout<Record> scheduledTimeout = + context.assertRevocationTimeout(groupId, memberId1, 90000); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Member 1 acks the revocation. The revocation timeout is cancelled. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(90000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Collections.singletonList(0))))); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(3) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0))))), + result.response() + ); + + // Verify that there is not revocation timeout. + context.assertNoRevocationTimeout(groupId, memberId1); + + // Execute the scheduled revocation timeout captured earlier to simulate a + // stale timeout. This should be a no-op. + assertEquals(Collections.emptyList(), scheduledTimeout.operation.generateRecords()); + } + + @Test + public void testRevocationTimeoutExpiration() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .build()) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + } + } + )); + + // Member 1 joins the group. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) // Use timeout smaller than session timeout. + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1, 2))))), + result.response() + ); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Prepare next assignment. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap<String, MemberAssignment>() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 2) + ))); + } + } + )); + + // Member 2 joins the group. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setTopicPartitions(Collections.emptyList())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setPendingTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(2))))), + result.response() + ); + + assertEquals( + Collections.emptyList(), + context.sleep(result.response().heartbeatIntervalMs()) + ); + + // Member 1 heartbeats and transitions to revoking. The revocation timeout + // is scheduled. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(1) + .setRebalanceTimeoutMs(90000) Review Comment: Where was the bug related to this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org