lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -399,14 +400,51 @@ public void testHeartbeatState() {
new ConsumerGroupHeartbeatResponseData.Assignment();
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
ConsumerGroupHeartbeatResponse rs1 = new
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
- .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setAssignment(assignmentTopic1));
+ .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setAssignment(assignmentTopic1));
membershipManager.onHeartbeatResponseReceived(rs1.data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
+ @Test
+ public void testEnsureLeaveGroupWhenPollTimerExpires() {
+ membershipManager.transitionToJoining();
+ time.sleep(1);
+ // Sending first heartbeat and transitioning to stable
+ assertHeartbeat(heartbeatRequestManager);
+ assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+ // Expires the poll timer, ensure sending a leave group
+ time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+ assertLeaveGroup(heartbeatRequestManager);
+ assertTrue(heartbeatRequestManager.pollTimer().isExpired());
+ // Poll again, ensure we heartbeat again.
+ time.sleep(1);
+ heartbeatRequestManager.resetPollTimer();
+ assertHeartbeat(heartbeatRequestManager);
+ assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+ }
+
+ private void assertHeartbeat(HeartbeatRequestManager hrm) {
+ System.out.println("assertHeartbeat");
+ NetworkClientDelegate.PollResult pollResult =
hrm.poll(time.milliseconds());
+ assertEquals(1, pollResult.unsentRequests.size());
+ assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
pollResult.timeUntilNextPollMs);
+
pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0),
+ Errors.NONE));
+ }
+
+ private void assertLeaveGroup(HeartbeatRequestManager hrm) {
+ NetworkClientDelegate.PollResult pollResult =
hrm.poll(time.milliseconds());
+ assertEquals(1, pollResult.unsentRequests.size());
+ ConsumerGroupHeartbeatRequestData data =
(ConsumerGroupHeartbeatRequestData)
pollResult.unsentRequests.get(0).requestBuilder().build().data();
+ assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
data.memberEpoch());
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
Review Comment:
Is this really the right end state when the poll timer expires? This means
that the user will have to call `subscribe` to join the group again. Is that
what the old coordinator requires? (I expected that the old code just requires
the consumer to be polled again to join the group. If my understanding is right
then we're missing logic after sending the last HB to leave: on timer
expiration we should send the last HB and transitionToJoining so that the
member re-joins the group on the next poll)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]