This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86baac103b7 MINOR: Improve client error messages for share groups not 
enabled (#19688)
86baac103b7 is described below

commit 86baac103b7d3910952307006059555eb173fdd5
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue May 13 10:42:40 2025 +0100

    MINOR: Improve client error messages for share groups not enabled (#19688)
    
    As mentioned in
    https://github.com/apache/kafka/pull/19378#pullrequestreview-2775598123,
    the error messages for a 4.1 share consumer could be clearer for the
    different cases for when it cannot successfully join a share group.
    
    This PR uses different error messages for the different cases such as
    out-of-date cluster or share groups just not enabled.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../consumer/internals/ShareHeartbeatRequestManager.java      |  7 +++++--
 .../consumer/internals/ShareHeartbeatRequestManagerTest.java  | 11 ++++++-----
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
index 21d598afb4c..f46b6f72c87 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
@@ -54,6 +54,9 @@ public class ShareHeartbeatRequestManager extends 
AbstractHeartbeatRequestManage
     public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster 
does not support the share group protocol. " +
         "To use share groups, the cluster must have the share group protocol 
enabled.";
 
+    public static final String SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG = "The 
cluster does not support the share group protocol " +
+        "using ShareGroupHeartbeat API version 1 or later. This version of the 
API was introduced in Apache Kafka v4.1.";
+
     public ShareHeartbeatRequestManager(
             final LogContext logContext,
             final Time time,
@@ -93,8 +96,8 @@ public class ShareHeartbeatRequestManager extends 
AbstractHeartbeatRequestManage
     public boolean handleSpecificFailure(Throwable exception) {
         boolean errorHandled = false;
         if (exception instanceof UnsupportedVersionException) {
-            logger.error("{} failed due to {}: {}", heartbeatRequestName(), 
exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG);
-            handleFatalFailure(new 
UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception));
+            logger.error("{} failed due to {}: {}", heartbeatRequestName(), 
exception.getMessage(), SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG);
+            handleFatalFailure(new 
UnsupportedVersionException(SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG, 
exception));
             errorHandled = true;
         }
         return errorHandled;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 856cdd29493..8952271b250 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -60,6 +60,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG;
+import static 
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -439,7 +440,7 @@ public class ShareHeartbeatRequestManagerTest {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
+    @ValueSource(strings = {SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG})
     public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) {
         mockResponseWithException(new UnsupportedVersionException(errorMsg), 
false);
 
@@ -692,11 +693,11 @@ public class ShareHeartbeatRequestManagerTest {
     private ClientResponse createHeartbeatResponseWithException(
             final NetworkClientDelegate.UnsentRequest request,
             final UnsupportedVersionException exception,
-            final boolean isFromClient
+            final boolean isFromBroker
     ) {
         ShareGroupHeartbeatResponse response = null;
-        if (!isFromClient) {
-            response = new ShareGroupHeartbeatResponse(null);
+        if (isFromBroker) {
+            response = new ShareGroupHeartbeatResponse(new 
ShareGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
         }
         return new ClientResponse(
                 new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, 
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
@@ -705,7 +706,7 @@ public class ShareHeartbeatRequestManagerTest {
                 time.milliseconds(),
                 time.milliseconds(),
                 false,
-                exception,
+                isFromBroker ? null : exception,
                 null,
                 response);
     }

Reply via email to