This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 86baac103b7 MINOR: Improve client error messages for share groups not
enabled (#19688)
86baac103b7 is described below
commit 86baac103b7d3910952307006059555eb173fdd5
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue May 13 10:42:40 2025 +0100
MINOR: Improve client error messages for share groups not enabled (#19688)
As mentioned in
https://github.com/apache/kafka/pull/19378#pullrequestreview-2775598123,
the error messages for a 4.1 share consumer could be clearer for the
different cases for when it cannot successfully join a share group.
This PR uses different error messages for the different cases such as
out-of-date cluster or share groups just not enabled.
Reviewers: Apoorv Mittal <[email protected]>
---
.../consumer/internals/ShareHeartbeatRequestManager.java | 7 +++++--
.../consumer/internals/ShareHeartbeatRequestManagerTest.java | 11 ++++++-----
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
index 21d598afb4c..f46b6f72c87 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
@@ -54,6 +54,9 @@ public class ShareHeartbeatRequestManager extends
AbstractHeartbeatRequestManage
public static final String SHARE_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster
does not support the share group protocol. " +
"To use share groups, the cluster must have the share group protocol
enabled.";
+ public static final String SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG = "The
cluster does not support the share group protocol " +
+ "using ShareGroupHeartbeat API version 1 or later. This version of the
API was introduced in Apache Kafka v4.1.";
+
public ShareHeartbeatRequestManager(
final LogContext logContext,
final Time time,
@@ -93,8 +96,8 @@ public class ShareHeartbeatRequestManager extends
AbstractHeartbeatRequestManage
public boolean handleSpecificFailure(Throwable exception) {
boolean errorHandled = false;
if (exception instanceof UnsupportedVersionException) {
- logger.error("{} failed due to {}: {}", heartbeatRequestName(),
exception.getMessage(), SHARE_PROTOCOL_NOT_SUPPORTED_MSG);
- handleFatalFailure(new
UnsupportedVersionException(SHARE_PROTOCOL_NOT_SUPPORTED_MSG, exception));
+ logger.error("{} failed due to {}: {}", heartbeatRequestName(),
exception.getMessage(), SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG);
+ handleFatalFailure(new
UnsupportedVersionException(SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG,
exception));
errorHandled = true;
}
return errorHandled;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 856cdd29493..8952271b250 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -60,6 +60,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import static
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_NOT_SUPPORTED_MSG;
+import static
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager.SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -439,7 +440,7 @@ public class ShareHeartbeatRequestManagerTest {
}
@ParameterizedTest
- @ValueSource(strings = {SHARE_PROTOCOL_NOT_SUPPORTED_MSG})
+ @ValueSource(strings = {SHARE_PROTOCOL_VERSION_NOT_SUPPORTED_MSG})
public void testUnsupportedVersionGeneratedOnTheClient(String errorMsg) {
mockResponseWithException(new UnsupportedVersionException(errorMsg),
false);
@@ -692,11 +693,11 @@ public class ShareHeartbeatRequestManagerTest {
private ClientResponse createHeartbeatResponseWithException(
final NetworkClientDelegate.UnsentRequest request,
final UnsupportedVersionException exception,
- final boolean isFromClient
+ final boolean isFromBroker
) {
ShareGroupHeartbeatResponse response = null;
- if (!isFromClient) {
- response = new ShareGroupHeartbeatResponse(null);
+ if (isFromBroker) {
+ response = new ShareGroupHeartbeatResponse(new
ShareGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code()));
}
return new ClientResponse(
new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT,
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
@@ -705,7 +706,7 @@ public class ShareHeartbeatRequestManagerTest {
time.milliseconds(),
time.milliseconds(),
false,
- exception,
+ isFromBroker ? null : exception,
null,
response);
}