This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2185004083e KAFKA-16251: Fix for not sending heartbeat while fenced 
(#15392)
2185004083e is described below

commit 2185004083ebb8f0b3a443132b5a33908c459c65
Author: Lianet Magrans <98415067+lian...@users.noreply.github.com>
AuthorDate: Fri Feb 23 04:56:05 2024 -0500

    KAFKA-16251: Fix for not sending heartbeat while fenced (#15392)
    
    Fix to ensure that a consumer that has been fenced by the coordinator stops 
sending heartbeats while it is on the FENCED state releasing its assignment 
(waiting for the onPartitionsLost callback to complete). Once the callback 
completes, the member transitions to JOINING and it's then when it should 
resume sending heartbeats again.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../consumer/internals/MembershipManagerImpl.java  | 14 +++++------
 .../internals/HeartbeatRequestManagerTest.java     | 27 ++++++++++++++++++++++
 .../internals/MembershipManagerImplTest.java       |  2 ++
 3 files changed, 35 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index dd035506d4b..81e65dfd866 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -720,18 +720,16 @@ public class MembershipManagerImpl implements 
MembershipManager {
     }
 
     /**
-     * @return True if the member should not send heartbeats, which would be 
one of the following
-     * cases:
-     * <ul>
-     * <li>Member is not subscribed to any topics</li>
-     * <li>Member has received a fatal error in a previous heartbeat 
response</li>
-     * <li>Member is stale, meaning that it has left the group due to expired 
poll timer</li>
-     * </ul>
+     * @return True if the member should not send heartbeats, which is the 
case when it is in a
+     * state where it is not an active member of the group.
      */
     @Override
     public boolean shouldSkipHeartbeat() {
         MemberState state = state();
-        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL 
|| state == MemberState.STALE;
+        return state == MemberState.UNSUBSCRIBED ||
+            state == MemberState.FATAL ||
+            state == MemberState.STALE ||
+            state == MemberState.FENCED;
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 4016b74b27b..72a5c0349d2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -663,6 +663,33 @@ public class HeartbeatRequestManagerTest {
         assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
     }
 
+    @Test
+    public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+        mockStableMember();
+
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        // Receive HB response fencing member
+        when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+        doNothing().when(membershipManager).transitionToFenced();
+        ClientResponse response = 
createHeartbeatResponse(result.unsentRequests.get(0), 
Errors.FENCED_MEMBER_EPOCH);
+        result.unsentRequests.get(0).handler().onComplete(response);
+
+        verify(membershipManager).transitionToFenced();
+        verify(heartbeatRequestState).onFailedAttempt(anyLong());
+        verify(heartbeatRequestState).reset();
+
+        when(membershipManager.state()).thenReturn(MemberState.FENCED);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(), "Member should not send 
heartbeats while FENCED");
+
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size(), "Fenced member should 
resume heartbeat after transitioning to JOINING");
+    }
+
     private void assertHeartbeat(HeartbeatRequestManager hrm, int nextPollMs) {
         NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index f5c65d58bfc..50f28bb5233 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -1705,6 +1705,8 @@ public class MembershipManagerImplTest {
         assertEquals(0, listener.assignedCount());
         assertEquals(0, listener.lostCount());
 
+        assertTrue(membershipManager.shouldSkipHeartbeat(), "Member should not 
send heartbeat while fenced");
+
         // Step 3: invoke the callback
         performCallback(
                 membershipManager,

Reply via email to