lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -399,14 +400,51 @@ public void testHeartbeatState() {
             new ConsumerGroupHeartbeatResponseData.Assignment();
         
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
         ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-                .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-                .setMemberId(memberId)
-                .setMemberEpoch(1)
-                .setAssignment(assignmentTopic1));
+            .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+            .setMemberId(memberId)
+            .setMemberEpoch(1)
+            .setAssignment(assignmentTopic1));
         membershipManager.onHeartbeatResponseReceived(rs1.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
+    @Test
+    public void testEnsureLeaveGroupWhenPollTimerExpires() {
+        membershipManager.transitionToJoining();
+        time.sleep(1);
+        // Sending first heartbeat and transitioning to stable
+        assertHeartbeat(heartbeatRequestManager);
+        assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+        // Expires the poll timer, ensure sending a leave group
+        time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+        assertLeaveGroup(heartbeatRequestManager);
+        assertTrue(heartbeatRequestManager.pollTimer().isExpired());
+        // Poll again, ensure we heartbeat again.
+        time.sleep(1);
+        heartbeatRequestManager.resetPollTimer();
+        assertHeartbeat(heartbeatRequestManager);
+        assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+    }
+
+    private void assertHeartbeat(HeartbeatRequestManager hrm) {
+        System.out.println("assertHeartbeat");
+        NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
+        assertEquals(1, pollResult.unsentRequests.size());
+        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
pollResult.timeUntilNextPollMs);
+        
pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0),
+            Errors.NONE));
+    }
+
+    private void assertLeaveGroup(HeartbeatRequestManager hrm) {
+        NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
+        assertEquals(1, pollResult.unsentRequests.size());
+        ConsumerGroupHeartbeatRequestData data = 
(ConsumerGroupHeartbeatRequestData) 
pollResult.unsentRequests.get(0).requestBuilder().build().data();
+        assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
data.memberEpoch());
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

Review Comment:
   Is this really the right end state when the poll timer expires? This means 
that the user will have to call `subscribe` to join the group again. Is that 
what the old coordinator requires? (I expected that the old code just requires 
the consumer to be polled again to join the group. If my understanding is right 
then we're missing logic after sending the last HB to leave: on timer 
expiration we should send the last HB and transitionToJoining so that the 
member re-joins the group on the next poll) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to