dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260618876


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2402,6 +2448,580 @@ public void testOnNewMetadataImage() {
         assertEquals(image, context.groupMetadataManager.image());
     }
 
+    @Test
+    public void testSessionTimeoutLifecycle() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );

Review Comment:
   > i'm a bit confused - if the timer advances by heartbeat interval then 
shouldn't the existing session timeout expire?
   
   The session timeout expires based on the session timeout not the heartbeat 
interval, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to