This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ca9f4aeda76 KAFKA-16639 Ensure HeartbeatRequestManager generates leave 
request regardless of in-flight heartbeats. (#16017)
ca9f4aeda76 is described below

commit ca9f4aeda769e05222e1734dd93ab95dc27d47eb
Author: TingIāu "Ting" Kì <51072200+frankvi...@users.noreply.github.com>
AuthorDate: Sat Jun 1 04:14:15 2024 +0800

    KAFKA-16639 Ensure HeartbeatRequestManager generates leave request 
regardless of in-flight heartbeats. (#16017)
    
    Fix the bug where the heartbeat is not sent when a newly created consumer 
is immediately closed.
    
    When there is a heartbeat request in flight and the consumer is then 
closed. In the current code, the HeartbeatRequestManager does not correctly 
send the closing heartbeat because a previous heartbeat request is still in 
flight. However, the closing heartbeat is only sent once, so in this situation, 
the broker will not know that the consumer has left the consumer group until 
the consumer's heartbeat times out.
    This situation causes the broker to wait until the consumer's heartbeat 
times out before triggering a consumer group rebalance, which in turn affects 
message consumption.
    
    Reviewers: Lianet Magrans <liane...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../consumer/internals/HeartbeatRequestManager.java |  7 ++++++-
 .../internals/HeartbeatRequestManagerTest.java      | 21 ++++++++++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index a956ef3a939..d31d412c655 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -45,6 +45,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+
 /**
  * <p>Manages the request creation and response handling for the heartbeat. 
The module creates a
  * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
@@ -208,7 +209,11 @@ public class HeartbeatRequestManager implements 
RequestManager {
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(leaveHeartbeat));
         }
 
-        boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+        // Case 1: The member is leaving
+        boolean heartbeatNow = membershipManager.state() == 
MemberState.LEAVING ||
+                // Case 2: The member state indicates it should send a 
heartbeat without waiting for the interval, and there is no heartbeat request 
currently in-flight
+                (membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight());
+
         if (!heartbeatRequestState.canSendRequest(currentTimeMs) && 
!heartbeatNow) {
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 8334fb23605..f63dd55754a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -277,7 +277,7 @@ public class HeartbeatRequestManagerTest {
         result = heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a " +
             "previous one is in-flight");
-        
+
         time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
         result = heartbeatRequestManager.poll(time.milliseconds());
         assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent when the " +
@@ -752,6 +752,25 @@ public class HeartbeatRequestManagerTest {
         assertEquals(1, result.unsentRequests.size(), "Fenced member should 
resume heartbeat after transitioning to JOINING");
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+    public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final 
short version) {
+        mockStableMember();
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
+
+        membershipManager.leaveGroup();
+
+        ConsumerGroupHeartbeatRequest heartbeatToLeave = 
getHeartbeatRequest(heartbeatRequestManager, version);
+        assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
heartbeatToLeave.data().memberEpoch());
+
+        NetworkClientDelegate.PollResult pollAgain = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, pollAgain.unsentRequests.size());
+    }
+
     private void assertHeartbeat(HeartbeatRequestManager hrm, int nextPollMs) {
         NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());

Reply via email to