lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -399,14 +400,51 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() - .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) - .setMemberId(memberId) - .setMemberEpoch(1) - .setAssignment(assignmentTopic1)); + .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) + .setMemberId(memberId) + .setMemberEpoch(1) + .setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } + @Test + public void testEnsureLeaveGroupWhenPollTimerExpires() { + membershipManager.transitionToJoining(); + time.sleep(1); + // Sending first heartbeat and transitioning to stable + assertHeartbeat(heartbeatRequestManager); + assertFalse(heartbeatRequestManager.pollTimer().isExpired()); + // Expires the poll timer, ensure sending a leave group + time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); + assertLeaveGroup(heartbeatRequestManager); + assertTrue(heartbeatRequestManager.pollTimer().isExpired()); + // Poll again, ensure we heartbeat again. + time.sleep(1); + heartbeatRequestManager.resetPollTimer(); + assertHeartbeat(heartbeatRequestManager); + assertFalse(heartbeatRequestManager.pollTimer().isExpired()); + } + + private void assertHeartbeat(HeartbeatRequestManager hrm) { + System.out.println("assertHeartbeat"); + NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); + assertEquals(1, pollResult.unsentRequests.size()); + assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, pollResult.timeUntilNextPollMs); + pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0), + Errors.NONE)); + } + + private void assertLeaveGroup(HeartbeatRequestManager hrm) { + NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); + assertEquals(1, pollResult.unsentRequests.size()); + ConsumerGroupHeartbeatRequestData data = (ConsumerGroupHeartbeatRequestData) pollResult.unsentRequests.get(0).requestBuilder().build().data(); + assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, data.memberEpoch()); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); Review Comment: Is this really the right end state when the poll timer expires? This means that the user will have to call `subscribe` to join the group again. Is that what the old coordinator requires? (I expected that the old code just requires the consumer to be polled again to join the group. If my understanding is right then we're missing logic after sending the last HB to leave: on timer expiration we should send the last HB and transitionToJoining so that the member re-joins the group on the next poll) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org