jeffkbkim commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260535581


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
         assertEquals(image, context.groupMetadataManager.image());
     }
 
+    @Test
+    public void testSessionTimeoutLifecycle() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );

Review Comment:
   i'm a bit confused - if the timer advances by heartbeat interval then 
shouldn't the existing session timeout expire?
   
   or is the member removed from the group after this block?
   
   i think it would also be good to assertSessionTimeout after advancing the 
clock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to