This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 266532f562e KAFKA-18736: Handle errors in the Streams group heartbeat 
request manager (#19230)
266532f562e is described below

commit 266532f562ea58a5a6dbc7510912dcf72fc20c4a
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 24 21:26:14 2025 +0100

    KAFKA-18736: Handle errors in the Streams group heartbeat request manager 
(#19230)
    
    This commit adds error handling to the Streams heartbeat request
    manager.
    
    Errors can occur while sending a heartbeat request and when a response
    with an error code that is not NONE is received.
    
    Some errors are handled explicitly to recover from them or to log
    specific messages. All the others are handled as fatal errors.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../StreamsGroupHeartbeatRequestManager.java       | 152 ++++++++
 .../internals/StreamsMembershipManager.java        |  21 +-
 .../StreamsGroupHeartbeatRequestManagerTest.java   | 429 ++++++++++++++++++++-
 .../internals/StreamsMembershipManagerTest.java    |  24 +-
 4 files changed, 604 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 2aa466e7f60..bf67b953dad 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -22,6 +22,9 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -59,6 +62,11 @@ import java.util.stream.IntStream;
  */
 public class StreamsGroupHeartbeatRequestManager implements RequestManager {
 
+    private static final String UNSUPPORTED_VERSION_ERROR_MESSAGE = "The 
cluster does not support the STREAMS group " +
+        "protocol or does not support the versions of the STREAMS group 
protocol used by this client " +
+        "(used versions: " + 
StreamsGroupHeartbeatRequestData.LOWEST_SUPPORTED_VERSION + " to " +
+        StreamsGroupHeartbeatRequestData.HIGHEST_SUPPORTED_VERSION + ").";
+
     static class HeartbeatState {
 
         // Fields of StreamsGroupHeartbeatRequest sent in the most recent 
request
@@ -409,6 +417,8 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             if (response != null) {
                 
metricsManager.recordRequestLatency(response.requestLatencyMs());
                 onResponse((StreamsGroupHeartbeatResponse) 
response.responseBody(), completionTimeMs);
+            } else {
+                onFailure(exception, completionTimeMs);
             }
         });
     }
@@ -428,6 +438,8 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
     private void onResponse(final StreamsGroupHeartbeatResponse response, long 
currentTimeMs) {
         if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
             onSuccessResponse(response, currentTimeMs);
+        } else {
+            onErrorResponse(response, currentTimeMs);
         }
     }
 
@@ -451,6 +463,146 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
         membershipManager.onHeartbeatSuccess(response);
     }
 
+    private void onErrorResponse(final StreamsGroupHeartbeatResponse response, 
final long currentTimeMs) {
+        final Errors error = Errors.forCode(response.data().errorCode());
+        final String errorMessage = response.data().errorMessage();
+
+        heartbeatState.reset();
+        this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
+
+        switch (error) {
+            case NOT_COORDINATOR:
+                logInfo(
+                    String.format("StreamsGroupHeartbeatRequest failed because 
the group coordinator %s is incorrect. " +
+                        "Will attempt to find the coordinator again and 
retry", coordinatorRequestManager.coordinator()),
+                    response,
+                    currentTimeMs
+                );
+                coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                // Skip backoff so that the next HB is sent as soon as the new 
coordinator is discovered
+                heartbeatRequestState.reset();
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+                logInfo(
+                    String.format("StreamsGroupHeartbeatRequest failed because 
the group coordinator %s is not available. " +
+                        "Will attempt to find the coordinator again and 
retry", coordinatorRequestManager.coordinator()),
+                    response,
+                    currentTimeMs
+                );
+                coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                // Skip backoff so that the next HB is sent as soon as the new 
coordinator is discovered
+                heartbeatRequestState.reset();
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                logInfo(
+                    String.format("StreamsGroupHeartbeatRequest failed because 
the group coordinator %s is still loading. " +
+                    "Will retry", coordinatorRequestManager.coordinator()),
+                    response,
+                    currentTimeMs
+                );
+                break;
+
+            case GROUP_AUTHORIZATION_FAILED:
+                GroupAuthorizationException exception =
+                    
GroupAuthorizationException.forGroupId(membershipManager.groupId());
+                logger.error("StreamsGroupHeartbeatRequest failed due to group 
authorization failure: {}",
+                    exception.getMessage());
+                handleFatalFailure(error.exception(exception.getMessage()));
+                break;
+
+            case TOPIC_AUTHORIZATION_FAILED:
+                logger.error("StreamsGroupHeartbeatRequest failed for member 
{} with state {} due to {}: {}",
+                    membershipManager.memberId(), membershipManager.state(), 
error, errorMessage);
+                // Propagate auth error received in HB so that it's returned 
on poll.
+                // Member should stay in its current state so it can recover 
if ever the missing ACLs are added.
+                backgroundEventHandler.add(new ErrorEvent(error.exception()));
+                break;
+
+            case INVALID_REQUEST:
+            case GROUP_MAX_SIZE_REACHED:
+            case STREAMS_INVALID_TOPOLOGY:
+            case STREAMS_INVALID_TOPOLOGY_EPOCH:
+            case STREAMS_TOPOLOGY_FENCED:
+                logger.error("StreamsGroupHeartbeatRequest failed due to {}: 
{}", error, errorMessage);
+                handleFatalFailure(error.exception(errorMessage));
+                break;
+
+            case FENCED_MEMBER_EPOCH:
+                logInfo(
+                    String.format("StreamsGroupHeartbeatRequest failed for 
member %s because epoch %s is fenced.",
+                        membershipManager.memberId(), 
membershipManager.memberEpoch()),
+                    response,
+                    currentTimeMs
+                );
+                membershipManager.onFenced();
+                // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
+                heartbeatRequestState.reset();
+                break;
+
+            case UNKNOWN_MEMBER_ID:
+                logInfo(
+                    String.format("StreamsGroupHeartbeatRequest failed because 
member %s is unknown.",
+                        membershipManager.memberId()),
+                    response,
+                    currentTimeMs
+                );
+                membershipManager.onFenced();
+                // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
+                heartbeatRequestState.reset();
+                break;
+
+            case UNSUPPORTED_VERSION:
+                logger.error("StreamsGroupHeartbeatRequest failed due to {}: 
{}", error, UNSUPPORTED_VERSION_ERROR_MESSAGE);
+                
handleFatalFailure(error.exception(UNSUPPORTED_VERSION_ERROR_MESSAGE));
+                break;
+
+            default:
+                logger.error("StreamsGroupHeartbeatRequest failed due to 
unexpected error {}: {}", error, errorMessage);
+                handleFatalFailure(error.exception(errorMessage));
+        }
+        membershipManager.onFatalHeartbeatFailure();
+    }
+
+    private void logInfo(final String message,
+                         final StreamsGroupHeartbeatResponse response,
+                         final long currentTimeMs) {
+        logger.info("{} in {}ms: {}",
+            message,
+            heartbeatRequestState.remainingBackoffMs(currentTimeMs),
+            response.data().errorMessage());
+    }
+
+    private void onFailure(final Throwable exception, final long 
responseTimeMs) {
+        heartbeatRequestState.onFailedAttempt(responseTimeMs);
+        heartbeatState.reset();
+        if (exception instanceof RetriableException) {
+            coordinatorRequestManager.handleCoordinatorDisconnect(exception, 
responseTimeMs);
+            String message = String.format("StreamsGroupHeartbeatRequest 
failed because of a retriable exception. Will retry in %s ms: %s",
+                heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+                exception.getMessage());
+            logger.debug(message);
+            membershipManager.onRetriableHeartbeatFailure();
+        } else {
+            if (exception instanceof UnsupportedVersionException) {
+                logger.error("StreamsGroupHeartbeatRequest failed because of 
an unsupported version exception: {}",
+                    exception.getMessage());
+                handleFatalFailure(new 
UnsupportedVersionException(UNSUPPORTED_VERSION_ERROR_MESSAGE));
+            } else {
+                logger.error("StreamsGroupHeartbeatRequest failed because of a 
fatal exception while sending request: {}",
+                    exception.getMessage());
+                handleFatalFailure(exception);
+            }
+            membershipManager.onFatalHeartbeatFailure();
+        }
+    }
+
+    private void handleFatalFailure(Throwable error) {
+        backgroundEventHandler.add(new ErrorEvent(error));
+        membershipManager.transitionToFatal();
+    }
+
     private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> 
convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) {
         Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> 
partitionsByHost = new HashMap<>();
         data.partitionsByUserEndpoint().forEach(endpoint -> {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index f5f269f52b5..a5684bea83a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -702,14 +702,21 @@ public class StreamsMembershipManager implements 
RequestManager {
     }
 
     /**
-     * Notify the member that an error heartbeat response was received.
-     *
-     * @param retriable True if the request failed with a retriable error.
+     * Notify the member that a retriable error heartbeat response was 
received.
      */
-    public void onHeartbeatFailure(boolean retriable) {
-        if (!retriable) {
-            metricsManager.maybeRecordRebalanceFailed();
-        }
+    public void onRetriableHeartbeatFailure() {
+        onHeartbeatFailure();
+    }
+
+    /**
+     * Notify the member that a fatal error heartbeat response was received.
+     */
+    public void onFatalHeartbeatFailure() {
+        metricsManager.maybeRecordRebalanceFailed();
+        onHeartbeatFailure();
+    }
+
+    private void onHeartbeatFailure() {
         // The leave group request is sent out once (not retried), so we 
should complete the leave
         // operation once the request completes, regardless of the response.
         if (state == MemberState.UNSUBSCRIBED && 
maybeCompleteLeaveInProgress()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 8acf48bbc6b..126be01e1f5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -22,14 +22,20 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -42,10 +48,12 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockedConstruction;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -61,6 +69,7 @@ import java.util.stream.Stream;
 import static 
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -529,7 +538,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
             verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
             verify(membershipManager).onHeartbeatRequestGenerated();
             final ClientResponse response = buildClientResponse();
-            networkRequest.future().complete(response);
+            networkRequest.handler().onComplete(response);
             verify(heartbeatRequestState, 
never()).updateHeartbeatIntervalMs(anyLong());
             verify(heartbeatRequestState, 
never()).onSuccessfulAttempt(anyLong());
             verify(membershipManager, never()).onHeartbeatSuccess(any());
@@ -572,7 +581,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
                 
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago", 
"consumer-coordinator-metrics")).metricValue()
             );
             final ClientResponse response = buildClientResponse();
-            networkRequest.future().complete(response);
+            networkRequest.handler().onComplete(response);
             
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse) 
response.responseBody());
             
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
             
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
@@ -975,6 +984,404 @@ class StreamsGroupHeartbeatRequestManagerTest {
         assertTrue(requestDataWithShutdownRequest.shutdownApplication());
     }
 
+    @Test
+    public void testCoordinatorDisconnectFailureWhileSending() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            time.sleep(1234);
+            final long completionTimeMs = time.milliseconds();
+            final DisconnectException disconnectException = 
DisconnectException.INSTANCE;
+            networkRequest.handler().onFailure(completionTimeMs, 
disconnectException);
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            verify(heartbeatRequestState).onFailedAttempt(completionTimeMs);
+            verify(heartbeatState).reset();
+            
verify(coordinatorRequestManager).handleCoordinatorDisconnect(disconnectException,
 completionTimeMs);
+            verify(membershipManager).onRetriableHeartbeatFailure();
+        }
+    }
+
+    @Test
+    public void testUnsupportedVersionFailureWhileSending() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            time.sleep(1234);
+            final long completionTimeMs = time.milliseconds();
+            final UnsupportedVersionException unsupportedVersionException = 
new UnsupportedVersionException("message");
+            networkRequest.handler().onFailure(completionTimeMs, 
unsupportedVersionException);
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            verify(heartbeatRequestState).onFailedAttempt(completionTimeMs);
+            verify(heartbeatState).reset();
+            verify(membershipManager).onFatalHeartbeatFailure();
+            ArgumentCaptor<ErrorEvent> errorEvent = 
ArgumentCaptor.forClass(ErrorEvent.class);
+            verify(backgroundEventHandler).add(errorEvent.capture());
+            assertEquals(
+                "The cluster does not support the STREAMS group " +
+                    "protocol or does not support the versions of the STREAMS 
group protocol used by this client " +
+                    "(used versions: " + 
StreamsGroupHeartbeatRequestData.LOWEST_SUPPORTED_VERSION + " to " +
+                    StreamsGroupHeartbeatRequestData.HIGHEST_SUPPORTED_VERSION 
+ ").",
+                errorEvent.getValue().error().getMessage()
+            );
+            assertInstanceOf(UnsupportedVersionException.class, 
errorEvent.getValue().error());
+            verify(membershipManager).transitionToFatal();
+        }
+    }
+
+    @Test
+    public void testFatalFailureWhileSending() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            time.sleep(1234);
+            final long completionTimeMs = time.milliseconds();
+            final RuntimeException fatalException = new RuntimeException();
+            networkRequest.handler().onFailure(completionTimeMs, 
fatalException);
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            verify(heartbeatRequestState).onFailedAttempt(completionTimeMs);
+            verify(heartbeatState).reset();
+            verify(membershipManager).onFatalHeartbeatFailure();
+            ArgumentCaptor<ErrorEvent> errorEvent = 
ArgumentCaptor.forClass(ErrorEvent.class);
+            verify(backgroundEventHandler).add(errorEvent.capture());
+            assertEquals(fatalException, errorEvent.getValue().error());
+            verify(membershipManager).transitionToFatal();
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+        value = Errors.class,
+        names = {"NOT_COORDINATOR", "COORDINATOR_NOT_AVAILABLE"}
+    )
+    public void 
testNotCoordinatorAndCoordinatorNotAvailableErrorResponse(final Errors error) {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            time.sleep(1234);
+            final long completionTimeMs = time.milliseconds();
+            final ClientResponse response = buildClientErrorResponse(error, 
"error message");
+            networkRequest.handler().onComplete(response);
+            verify(coordinatorRequestManager).markCoordinatorUnknown(
+                ((StreamsGroupHeartbeatResponse) 
response.responseBody()).data().errorMessage(),
+                completionTimeMs
+            );
+            verify(heartbeatState).reset();
+            verify(heartbeatRequestState).reset();
+            verify(membershipManager).onFatalHeartbeatFailure();
+        }
+    }
+
+    @Test
+    public void testCoordinatorLoadInProgressErrorResponse() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final ClientResponse response = 
buildClientErrorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, "message");
+            networkRequest.handler().onComplete(response);
+            verify(heartbeatState).reset();
+            verify(membershipManager).onFatalHeartbeatFailure();
+            verify(heartbeatRequestState, never()).reset();
+        }
+    }
+
+    @Test
+    public void testGroupAuthorizationFailedErrorResponse() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
+            final LogCaptureAppender logAppender = 
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.groupId()).thenReturn(GROUP_ID);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final ClientResponse response = 
buildClientErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED, "message");
+            networkRequest.handler().onComplete(response);
+            assertTrue(logAppender.getMessages("ERROR").stream()
+                .anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed 
due to group authorization failure: " +
+                    "Not authorized to access group: " + GROUP_ID)));
+            verify(heartbeatState).reset();
+            ArgumentCaptor<ErrorEvent> errorEvent = 
ArgumentCaptor.forClass(ErrorEvent.class);
+            verify(backgroundEventHandler).add(errorEvent.capture());
+            assertEquals(
+                GroupAuthorizationException.forGroupId(GROUP_ID).getMessage(),
+                errorEvent.getValue().error().getMessage()
+            );
+            assertInstanceOf(GroupAuthorizationException.class, 
errorEvent.getValue().error());
+            verify(membershipManager).transitionToFatal();
+            verify(membershipManager).onFatalHeartbeatFailure();
+        }
+    }
+
+    @Test
+    public void testTopicAuthorizationFailedErrorResponse() {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
+            final LogCaptureAppender logAppender = 
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.state()).thenReturn(MemberState.STABLE);
+            when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final String errorMessage = "message";
+            final ClientResponse response = 
buildClientErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, errorMessage);
+            networkRequest.handler().onComplete(response);
+            assertTrue(logAppender.getMessages("ERROR").stream()
+                .anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed 
for member " + MEMBER_ID +
+                    " with state " + MemberState.STABLE + " due to " + 
Errors.TOPIC_AUTHORIZATION_FAILED + ": " +
+                    errorMessage)));
+            verify(heartbeatState).reset();
+            ArgumentCaptor<ErrorEvent> errorEvent = 
ArgumentCaptor.forClass(ErrorEvent.class);
+            verify(backgroundEventHandler).add(errorEvent.capture());
+            assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.message(), 
errorEvent.getValue().error().getMessage());
+            assertInstanceOf(TopicAuthorizationException.class, 
errorEvent.getValue().error());
+            verify(membershipManager).onFatalHeartbeatFailure();
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+        value = Errors.class,
+        names = {
+            "INVALID_REQUEST",
+            "GROUP_MAX_SIZE_REACHED",
+            "UNSUPPORTED_VERSION",
+            "STREAMS_INVALID_TOPOLOGY",
+            "STREAMS_INVALID_TOPOLOGY_EPOCH",
+            "STREAMS_TOPOLOGY_FENCED"
+        }
+    )
+    public void testKnownFatalErrorResponse(final Errors error) {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
+            final LogCaptureAppender logAppender = 
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final String errorMessageInResponse = "message";
+            final ClientResponse response = buildClientErrorResponse(error, 
errorMessageInResponse);
+            networkRequest.handler().onComplete(response);
+            verify(heartbeatState).reset();
+            ArgumentCaptor<ErrorEvent> errorEvent = 
ArgumentCaptor.forClass(ErrorEvent.class);
+            verify(backgroundEventHandler).add(errorEvent.capture());
+            if (error == Errors.UNSUPPORTED_VERSION) {
+                final String errorMessage = "The cluster does not support the 
STREAMS group " +
+                    "protocol or does not support the versions of the STREAMS 
group protocol used by this client " +
+                    "(used versions: " + 
StreamsGroupHeartbeatRequestData.LOWEST_SUPPORTED_VERSION + " to " +
+                    StreamsGroupHeartbeatRequestData.HIGHEST_SUPPORTED_VERSION 
+ ").";
+                assertTrue(logAppender.getMessages("ERROR").stream()
+                    .anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest 
failed due to " +
+                        error + ": " + errorMessage)));
+                assertEquals(errorMessage, 
errorEvent.getValue().error().getMessage());
+            } else {
+                assertTrue(logAppender.getMessages("ERROR").stream()
+                    .anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest 
failed due to " +
+                        error + ": " + errorMessageInResponse)));
+                assertEquals(errorMessageInResponse, 
errorEvent.getValue().error().getMessage());
+            }
+            assertInstanceOf(error.exception().getClass(), 
errorEvent.getValue().error());
+            verify(membershipManager).transitionToFatal();
+            verify(membershipManager).onFatalHeartbeatFailure();
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+        value = Errors.class,
+        names = {"FENCED_MEMBER_EPOCH", "UNKNOWN_MEMBER_ID"}
+    )
+    public void testFencedMemberOrUnknownMemberIdErrorResponse(final Errors 
error) {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            final HeartbeatRequestState heartbeatRequestState = 
heartbeatRequestStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final String errorMessage = "message";
+            final ClientResponse response = buildClientErrorResponse(error, 
errorMessage);
+            networkRequest.handler().onComplete(response);
+            verify(heartbeatState).reset();
+            verify(heartbeatRequestState).reset();
+            verify(membershipManager).onFenced();
+            verify(membershipManager).onFatalHeartbeatFailure();
+        }
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideOtherErrors")
+    public void testOtherErrorResponse(final Errors error) {
+        try (
+            final MockedConstruction<HeartbeatRequestState> 
heartbeatRequestStateMockedConstruction = mockConstruction(
+                HeartbeatRequestState.class,
+                (mock, context) -> {
+                    
when(mock.canSendRequest(time.milliseconds())).thenReturn(true);
+                });
+            final 
MockedConstruction<StreamsGroupHeartbeatRequestManager.HeartbeatState> 
heartbeatStateMockedConstruction = mockConstruction(
+                StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
+            final LogCaptureAppender logAppender = 
LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class)
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            final StreamsGroupHeartbeatRequestManager.HeartbeatState 
heartbeatState = heartbeatStateMockedConstruction.constructed().get(0);
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+            assertEquals(1, result.unsentRequests.size());
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final String errorMessage = "message";
+            final ClientResponse response = buildClientErrorResponse(error, 
errorMessage);
+            networkRequest.handler().onComplete(response);
+            assertTrue(logAppender.getMessages("ERROR").stream()
+                .anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed 
due to unexpected error")));
+            verify(heartbeatState).reset();
+            ArgumentCaptor<ErrorEvent> errorEvent = 
ArgumentCaptor.forClass(ErrorEvent.class);
+            verify(backgroundEventHandler).add(errorEvent.capture());
+            assertEquals(errorMessage, 
errorEvent.getValue().error().getMessage());
+            assertInstanceOf(error.exception().getClass(), 
errorEvent.getValue().error());
+            verify(membershipManager).transitionToFatal();
+            verify(membershipManager).onFatalHeartbeatFailure();
+        }
+    }
+
+    private static Stream<Arguments> provideOtherErrors() {
+        final Set<Errors> consideredErrors = Set.of(
+            Errors.NONE,
+            Errors.NOT_COORDINATOR,
+            Errors.COORDINATOR_NOT_AVAILABLE,
+            Errors.COORDINATOR_LOAD_IN_PROGRESS,
+            Errors.GROUP_AUTHORIZATION_FAILED,
+            Errors.TOPIC_AUTHORIZATION_FAILED,
+            Errors.INVALID_REQUEST,
+            Errors.GROUP_MAX_SIZE_REACHED,
+            Errors.FENCED_MEMBER_EPOCH,
+            Errors.UNKNOWN_MEMBER_ID,
+            Errors.UNSUPPORTED_VERSION,
+            Errors.STREAMS_INVALID_TOPOLOGY,
+            Errors.STREAMS_INVALID_TOPOLOGY_EPOCH,
+            Errors.STREAMS_TOPOLOGY_FENCED);
+        return Arrays.stream(Errors.values())
+            .filter(error -> !consideredErrors.contains(error))
+            .map(Arguments::of);
+    }
+
     private static ConsumerConfig config() {
         Properties prop = new Properties();
         prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
@@ -1014,6 +1421,24 @@ class StreamsGroupHeartbeatRequestManagerTest {
         );
     }
 
+    private ClientResponse buildClientErrorResponse(final Errors error, final 
String errorMessage) {
+        return new ClientResponse(
+            new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, (short) 1, "", 
1),
+            null,
+            "-1",
+            time.milliseconds(),
+            time.milliseconds(),
+            false,
+            null,
+            null,
+            new StreamsGroupHeartbeatResponse(
+                new StreamsGroupHeartbeatResponseData()
+                    .setErrorCode(error.code())
+                    .setErrorMessage(errorMessage)
+            )
+        );
+    }
+
     private static void assertTaskIdsEquals(final 
List<StreamsGroupHeartbeatRequestData.TaskIds> expected,
                                             final 
List<StreamsGroupHeartbeatRequestData.TaskIds> actual) {
         List<StreamsGroupHeartbeatRequestData.TaskIds> sortedExpected = 
expected.stream()
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 5845da7bede..896a76ac209 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -34,6 +34,8 @@ import org.apache.kafka.common.utils.Time;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -1471,22 +1473,14 @@ public class StreamsMembershipManagerTest {
         membershipManager.onHeartbeatRequestGenerated();
         assertFalse(groupLeft.isDone());
 
-        membershipManager.onHeartbeatFailure(true);
+        membershipManager.onRetriableHeartbeatFailure();
 
         assertTrue(groupLeft.isDone());
     }
 
-    @Test
-    public void testOnHeartbeatFatalFailure() {
-        testOnHeartbeatFailure(false);
-    }
-
-    @Test
-    public void testOnHeartbeatRetriableFailure() {
-        testOnHeartbeatFailure(true);
-    }
-
-    private void testOnHeartbeatFailure(boolean retriable) {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnHeartbeatFailure(boolean retriable) {
         final MetricName failedRebalanceTotalMetricName = metrics.metricName(
             "failed-rebalance-total",
             CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX
@@ -1503,7 +1497,11 @@ public class StreamsMembershipManagerTest {
         final double failedRebalancesTotalBefore = (double) 
metrics.metric(failedRebalanceTotalMetricName).metricValue();
         assertEquals(0L, failedRebalancesTotalBefore);
 
-        membershipManager.onHeartbeatFailure(retriable);
+        if (retriable) {
+            membershipManager.onRetriableHeartbeatFailure();
+        } else {
+            membershipManager.onFatalHeartbeatFailure();
+        }
 
         final double failedRebalancesTotalAfter = (double) 
metrics.metric(failedRebalanceTotalMetricName).metricValue();
         assertEquals(retriable ? 0L : 1L, failedRebalancesTotalAfter);


Reply via email to