This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7bff678699a KAFKA-18859 honor the error message of 
UnregisterBrokerResponse (#19027)
7bff678699a is described below

commit 7bff678699ae5f69c18e2f86a43acff0d160a606
Author: Ken Huang <[email protected]>
AuthorDate: Sun Mar 16 03:06:01 2025 +0800

    KAFKA-18859 honor the error message of UnregisterBrokerResponse (#19027)
    
    Reviewers: Ismael Juma <[email protected]>, TengYao Chi 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  6 ++---
 .../common/requests/UnregisterBrokerRequest.java   |  3 ++-
 .../kafka/common/requests/RequestResponseTest.java | 27 ++++++++++++++++++----
 .../main/scala/kafka/server/ControllerApis.scala   |  4 +---
 4 files changed, 29 insertions(+), 11 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f6f484ca7bf..c4dcceae63c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4794,11 +4794,11 @@ public class KafkaAdminClient extends AdminClient {
                         future.complete(null);
                         break;
                     case REQUEST_TIMED_OUT:
-                        throw error.exception();
+                        throw error.exception(response.data().errorMessage());
                     default:
                         log.error("Unregister broker request for broker ID {} 
failed: {}",
-                            brokerId, error.message());
-                        future.completeExceptionally(error.exception());
+                            brokerId, response.data().errorMessage());
+                        
future.completeExceptionally(error.exception(response.data().errorMessage()));
                         break;
                 }
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
index 253499f85af..d0cbf715ddb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
@@ -57,7 +57,8 @@ public class UnregisterBrokerRequest extends AbstractRequest {
         Errors error = Errors.forException(e);
         return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
                 .setThrottleTimeMs(throttleTimeMs)
-                .setErrorCode(error.code()));
+                .setErrorCode(error.code())
+                .setErrorMessage(e.getMessage()));
     }
 
     public static UnregisterBrokerRequest parse(ByteBuffer buffer, short 
version) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1a5050907c0..cca885f591c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -315,7 +315,7 @@ import static 
org.apache.kafka.common.protocol.ApiKeys.OFFSET_FETCH;
 import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
 import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
-import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS;
+import static org.apache.kafka.common.protocol.ApiKeys.UNREGISTER_BROKER;
 import static 
org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -336,11 +336,13 @@ public class RequestResponseTest {
     public void testSerialization() {
         Map<ApiKeys, List<Short>> toSkip = new HashMap<>();
         // It's not possible to create a MetadataRequest v0 via the builder
-        toSkip.put(METADATA, singletonList((short) 0));
+        toSkip.put(METADATA, List.of((short) 0));
         // DescribeLogDirsResponse v0, v1 and v2 don't have a top level error 
field
-        toSkip.put(DESCRIBE_LOG_DIRS, Arrays.asList((short) 0, (short) 1, 
(short) 2));
+        toSkip.put(DESCRIBE_LOG_DIRS, List.of((short) 0, (short) 1, (short) 
2));
         // ElectLeaders v0 does not have a top level error field, when 
accessing it, it defaults to NONE
-        toSkip.put(ELECT_LEADERS, singletonList((short) 0));
+        toSkip.put(ELECT_LEADERS, List.of((short) 0));
+        // UnregisterBroker v0 contains the error message in the response
+        toSkip.put(UNREGISTER_BROKER, List.of((short) 0));
 
         for (ApiKeys apikey : ApiKeys.values()) {
             for (short version : apikey.allVersions()) {
@@ -838,6 +840,23 @@ public class RequestResponseTest {
         }
     }
 
+    @Test
+    public void testUnregisterBrokerResponseWithUnknownServerError() {
+        UnregisterBrokerRequest request = new UnregisterBrokerRequest.Builder(
+            new UnregisterBrokerRequestData()
+        ).build((short) 0);
+        String customerErrorMessage = "customer error message";
+        
+        UnregisterBrokerResponse response = request.getErrorResponse(
+            0, 
+            new RuntimeException(customerErrorMessage)
+        );
+
+        assertEquals(0, response.throttleTimeMs());
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
response.data().errorCode());
+        assertEquals(customerErrorMessage, response.data().errorMessage());
+    }
+
     private ApiVersionsResponse defaultApiVersionsResponse() {
         return 
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
     }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 1e343e776d5..0b3c1ea1dac 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -642,9 +642,7 @@ class ControllerApis(
       def createResponseCallback(requestThrottleMs: Int,
                                  e: Throwable): UnregisterBrokerResponse = {
         if (e != null) {
-          new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
-            setThrottleTimeMs(requestThrottleMs).
-            setErrorCode(Errors.forException(e).code))
+          decommissionRequest.getErrorResponse(requestThrottleMs, e)
         } else {
           new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
             setThrottleTimeMs(requestThrottleMs))

Reply via email to