This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ca9f4aeda76 KAFKA-16639 Ensure HeartbeatRequestManager generates leave request regardless of in-flight heartbeats. (#16017) ca9f4aeda76 is described below commit ca9f4aeda769e05222e1734dd93ab95dc27d47eb Author: TingIāu "Ting" Kì <51072200+frankvi...@users.noreply.github.com> AuthorDate: Sat Jun 1 04:14:15 2024 +0800 KAFKA-16639 Ensure HeartbeatRequestManager generates leave request regardless of in-flight heartbeats. (#16017) Fix the bug where the heartbeat is not sent when a newly created consumer is immediately closed. When there is a heartbeat request in flight and the consumer is then closed. In the current code, the HeartbeatRequestManager does not correctly send the closing heartbeat because a previous heartbeat request is still in flight. However, the closing heartbeat is only sent once, so in this situation, the broker will not know that the consumer has left the consumer group until the consumer's heartbeat times out. This situation causes the broker to wait until the consumer's heartbeat times out before triggering a consumer group rebalance, which in turn affects message consumption. Reviewers: Lianet Magrans <liane...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../consumer/internals/HeartbeatRequestManager.java | 7 ++++++- .../internals/HeartbeatRequestManagerTest.java | 21 ++++++++++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index a956ef3a939..d31d412c655 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -45,6 +45,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.stream.Collectors; + /** * <p>Manages the request creation and response handling for the heartbeat. The module creates a * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to @@ -208,7 +209,11 @@ public class HeartbeatRequestManager implements RequestManager { return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(leaveHeartbeat)); } - boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); + // Case 1: The member is leaving + boolean heartbeatNow = membershipManager.state() == MemberState.LEAVING || + // Case 2: The member state indicates it should send a heartbeat without waiting for the interval, and there is no heartbeat request currently in-flight + (membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight()); + if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 8334fb23605..f63dd55754a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -277,7 +277,7 @@ public class HeartbeatRequestManagerTest { result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + "previous one is in-flight"); - + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " + @@ -752,6 +752,25 @@ public class HeartbeatRequestManagerTest { assertEquals(1, result.unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING"); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) + public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { + mockStableMember(); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); + + membershipManager.leaveGroup(); + + ConsumerGroupHeartbeatRequest heartbeatToLeave = getHeartbeatRequest(heartbeatRequestManager, version); + assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, heartbeatToLeave.data().memberEpoch()); + + NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, pollAgain.unsentRequests.size()); + } + private void assertHeartbeat(HeartbeatRequestManager hrm, int nextPollMs) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size());