This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2185004083e KAFKA-16251: Fix for not sending heartbeat while fenced (#15392) 2185004083e is described below commit 2185004083ebb8f0b3a443132b5a33908c459c65 Author: Lianet Magrans <98415067+lian...@users.noreply.github.com> AuthorDate: Fri Feb 23 04:56:05 2024 -0500 KAFKA-16251: Fix for not sending heartbeat while fenced (#15392) Fix to ensure that a consumer that has been fenced by the coordinator stops sending heartbeats while it is on the FENCED state releasing its assignment (waiting for the onPartitionsLost callback to complete). Once the callback completes, the member transitions to JOINING and it's then when it should resume sending heartbeats again. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../consumer/internals/MembershipManagerImpl.java | 14 +++++------ .../internals/HeartbeatRequestManagerTest.java | 27 ++++++++++++++++++++++ .../internals/MembershipManagerImplTest.java | 2 ++ 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index dd035506d4b..81e65dfd866 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -720,18 +720,16 @@ public class MembershipManagerImpl implements MembershipManager { } /** - * @return True if the member should not send heartbeats, which would be one of the following - * cases: - * <ul> - * <li>Member is not subscribed to any topics</li> - * <li>Member has received a fatal error in a previous heartbeat response</li> - * <li>Member is stale, meaning that it has left the group due to expired poll timer</li> - * </ul> + * @return True if the member should not send heartbeats, which is the case when it is in a + * state where it is not an active member of the group. */ @Override public boolean shouldSkipHeartbeat() { MemberState state = state(); - return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE; + return state == MemberState.UNSUBSCRIBED || + state == MemberState.FATAL || + state == MemberState.STALE || + state == MemberState.FENCED; } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 4016b74b27b..72a5c0349d2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -663,6 +663,33 @@ public class HeartbeatRequestManagerTest { assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); } + @Test + public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { + mockStableMember(); + + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + // Receive HB response fencing member + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + doNothing().when(membershipManager).transitionToFenced(); + ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.FENCED_MEMBER_EPOCH); + result.unsentRequests.get(0).handler().onComplete(response); + + verify(membershipManager).transitionToFenced(); + verify(heartbeatRequestState).onFailedAttempt(anyLong()); + verify(heartbeatRequestState).reset(); + + when(membershipManager.state()).thenReturn(MemberState.FENCED); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size(), "Member should not send heartbeats while FENCED"); + + when(membershipManager.state()).thenReturn(MemberState.JOINING); + result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING"); + } + private void assertHeartbeat(HeartbeatRequestManager hrm, int nextPollMs) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index f5c65d58bfc..50f28bb5233 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -1705,6 +1705,8 @@ public class MembershipManagerImplTest { assertEquals(0, listener.assignedCount()); assertEquals(0, listener.lostCount()); + assertTrue(membershipManager.shouldSkipHeartbeat(), "Member should not send heartbeat while fenced"); + // Step 3: invoke the callback performCallback( membershipManager,