This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c26b09c6092 KAFKA-18904: [1/N] Change ListClientMetricsResources API 
to ListConfigResources (#19493)
c26b09c6092 is described below

commit c26b09c6092837c47795d645526d3d098d0bd07e
Author: PoAn Yang <[email protected]>
AuthorDate: Thu May 15 17:39:00 2025 -0500

    KAFKA-18904: [1/N] Change ListClientMetricsResources API to 
ListConfigResources (#19493)
    
    * Change `ListClientMetricsResourcesRequest.json` to
    `ListConfigResourcesRequest.json`.
    * Change `ListClientMetricsResourcesResponse.json` to
    `ListConfigResourcesResponse.json`.
    * Change `ListClientMetricsResourcesRequest.java` to
    `ListConfigResourcesRequest.java`.
    * Change `ListClientMetricsResourcesResponse.java` to
    `ListConfigResourcesResponsejava`.
    * Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
    v0 and v1 requests.
    
    Reviewers: Andrew Schofield <[email protected]>
    
    ---------
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  15 +-
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   2 +-
 .../kafka/common/requests/AbstractRequest.java     |   4 +-
 .../kafka/common/requests/AbstractResponse.java    |   4 +-
 .../ListClientMetricsResourcesRequest.java         |  75 -------
 .../requests/ListConfigResourcesRequest.java       |  95 +++++++++
 ...ponse.java => ListConfigResourcesResponse.java} |  33 ++-
 ...equest.json => ListConfigResourcesRequest.json} |   9 +-
 ...ponse.json => ListConfigResourcesResponse.json} |  16 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  44 ++--
 .../kafka/common/requests/RequestResponseTest.java |  18 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  62 +++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 233 +++++++++++++++++++--
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   4 +-
 .../coordinator/group/GroupConfigManager.java      |   5 +
 .../apache/kafka/network/RequestConvertToJson.java |  16 +-
 16 files changed, 462 insertions(+), 173 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 87a951fe26e..0abad889c85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -159,7 +159,7 @@ import 
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.U
 import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
 import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
-import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
+import org.apache.kafka.common.message.ListConfigResourcesRequestData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
@@ -233,8 +233,8 @@ import 
org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListConfigResourcesRequest;
+import org.apache.kafka.common.requests.ListConfigResourcesResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -4887,13 +4887,16 @@ public class KafkaAdminClient extends AdminClient {
             new LeastLoadedNodeProvider()) {
 
             @Override
-            ListClientMetricsResourcesRequest.Builder createRequest(int 
timeoutMs) {
-                return new ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData());
+            ListConfigResourcesRequest.Builder createRequest(int timeoutMs) {
+                return new ListConfigResourcesRequest.Builder(
+                    new ListConfigResourcesRequestData()
+                        
.setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))
+                );
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
-                ListClientMetricsResourcesResponse response = 
(ListClientMetricsResourcesResponse) abstractResponse;
+                ListConfigResourcesResponse response = 
(ListConfigResourcesResponse) abstractResponse;
                 if (response.error().isFailure()) {
                     future.completeExceptionally(response.error().exception());
                 } else {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 8534561af8b..89b952e6ce7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -116,7 +116,7 @@ public enum ApiKeys {
     GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
     PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
     ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
-    
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
+    LIST_CONFIG_RESOURCES(ApiMessageType.LIST_CONFIG_RESOURCES),
     DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS),
     SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT),
     SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 9738711e73a..750de2050f4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -316,8 +316,8 @@ public abstract class AbstractRequest implements 
AbstractRequestResponse {
                 return PushTelemetryRequest.parse(readable, apiVersion);
             case ASSIGN_REPLICAS_TO_DIRS:
                 return AssignReplicasToDirsRequest.parse(readable, apiVersion);
-            case LIST_CLIENT_METRICS_RESOURCES:
-                return ListClientMetricsResourcesRequest.parse(readable, 
apiVersion);
+            case LIST_CONFIG_RESOURCES:
+                return ListConfigResourcesRequest.parse(readable, apiVersion);
             case DESCRIBE_TOPIC_PARTITIONS:
                 return DescribeTopicPartitionsRequest.parse(readable, 
apiVersion);
             case SHARE_GROUP_HEARTBEAT:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 01cc03c12b6..bc313078d74 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -253,8 +253,8 @@ public abstract class AbstractResponse implements 
AbstractRequestResponse {
                 return PushTelemetryResponse.parse(readable, version);
             case ASSIGN_REPLICAS_TO_DIRS:
                 return AssignReplicasToDirsResponse.parse(readable, version);
-            case LIST_CLIENT_METRICS_RESOURCES:
-                return ListClientMetricsResourcesResponse.parse(readable, 
version);
+            case LIST_CONFIG_RESOURCES:
+                return ListConfigResourcesResponse.parse(readable, version);
             case DESCRIBE_TOPIC_PARTITIONS:
                 return DescribeTopicPartitionsResponse.parse(readable, 
version);
             case SHARE_GROUP_HEARTBEAT:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java
deleted file mode 100644
index 417740d0ffa..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.Readable;
-
-public class ListClientMetricsResourcesRequest extends AbstractRequest {
-    public static class Builder extends 
AbstractRequest.Builder<ListClientMetricsResourcesRequest> {
-        public final ListClientMetricsResourcesRequestData data;
-
-        public Builder(ListClientMetricsResourcesRequestData data) {
-            super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES);
-            this.data = data;
-        }
-
-        @Override
-        public ListClientMetricsResourcesRequest build(short version) {
-            return new ListClientMetricsResourcesRequest(data, version);
-        }
-
-        @Override
-        public String toString() {
-            return data.toString();
-        }
-    }
-
-    private final ListClientMetricsResourcesRequestData data;
-
-    private 
ListClientMetricsResourcesRequest(ListClientMetricsResourcesRequestData data, 
short version) {
-        super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES, version);
-        this.data = data;
-    }
-
-    public ListClientMetricsResourcesRequestData data() {
-        return data;
-    }
-
-    @Override
-    public ListClientMetricsResourcesResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
-        Errors error = Errors.forException(e);
-        ListClientMetricsResourcesResponseData response = new 
ListClientMetricsResourcesResponseData()
-            .setErrorCode(error.code())
-            .setThrottleTimeMs(throttleTimeMs);
-        return new ListClientMetricsResourcesResponse(response);
-    }
-
-    public static ListClientMetricsResourcesRequest parse(Readable readable, 
short version) {
-        return new ListClientMetricsResourcesRequest(new 
ListClientMetricsResourcesRequestData(
-            readable, version), version);
-    }
-
-    @Override
-    public String toString(boolean verbose) {
-        return data.toString();
-    }
-
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
new file mode 100644
index 00000000000..3af70938843
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.ListConfigResourcesRequestData;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Readable;
+
+import java.util.Set;
+
+public class ListConfigResourcesRequest extends AbstractRequest {
+    public static class Builder extends 
AbstractRequest.Builder<ListConfigResourcesRequest> {
+        public final ListConfigResourcesRequestData data;
+
+        public Builder(ListConfigResourcesRequestData data) {
+            super(ApiKeys.LIST_CONFIG_RESOURCES);
+            this.data = data;
+        }
+
+        @Override
+        public ListConfigResourcesRequest build(short version) {
+            return new ListConfigResourcesRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final ListConfigResourcesRequestData data;
+
+    private ListConfigResourcesRequest(ListConfigResourcesRequestData data, 
short version) {
+        super(ApiKeys.LIST_CONFIG_RESOURCES, version);
+        this.data = data;
+    }
+
+    public ListConfigResourcesRequestData data() {
+        return data;
+    }
+
+    @Override
+    public ListConfigResourcesResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        Errors error = Errors.forException(e);
+        ListConfigResourcesResponseData response = new 
ListConfigResourcesResponseData()
+            .setErrorCode(error.code())
+            .setThrottleTimeMs(throttleTimeMs);
+        return new ListConfigResourcesResponse(response);
+    }
+
+    public static ListConfigResourcesRequest parse(Readable readable, short 
version) {
+        return new ListConfigResourcesRequest(new 
ListConfigResourcesRequestData(
+            readable, version), version);
+    }
+
+    @Override
+    public String toString(boolean verbose) {
+        return data.toString();
+    }
+
+    /**
+     * Return the supported config resource types in different request version.
+     * If there is a new config resource type, the ListConfigResourcesRequest 
should bump a new request version to include it.
+     * For v0, the supported config resource types contain CLIENT_METRICS (16).
+     * For v1, the supported config resource types contain TOPIC (2), BROKER 
(4), BROKER_LOGGER (8), CLIENT_METRICS (16), and GROUP (32).
+     */
+    public Set<Byte> supportedResourceTypes() {
+        return version() == 0 ?
+            Set.of(ConfigResource.Type.CLIENT_METRICS.id()) :
+            Set.of(
+                ConfigResource.Type.TOPIC.id(),
+                ConfigResource.Type.BROKER.id(),
+                ConfigResource.Type.BROKER_LOGGER.id(),
+                ConfigResource.Type.CLIENT_METRICS.id(),
+                ConfigResource.Type.GROUP.id()
+            );
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
similarity index 64%
rename from 
clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
rename to 
clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
index c2b3b1601ed..36a4a807f7f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
@@ -17,7 +17,8 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
@@ -26,15 +27,15 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class ListClientMetricsResourcesResponse extends AbstractResponse {
-    private final ListClientMetricsResourcesResponseData data;
+public class ListConfigResourcesResponse extends AbstractResponse {
+    private final ListConfigResourcesResponseData data;
 
-    public 
ListClientMetricsResourcesResponse(ListClientMetricsResourcesResponseData data) 
{
-        super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES);
+    public ListConfigResourcesResponse(ListConfigResourcesResponseData data) {
+        super(ApiKeys.LIST_CONFIG_RESOURCES);
         this.data = data;
     }
 
-    public ListClientMetricsResourcesResponseData data() {
+    public ListConfigResourcesResponseData data() {
         return data;
     }
 
@@ -47,8 +48,8 @@ public class ListClientMetricsResourcesResponse extends 
AbstractResponse {
         return errorCounts(Errors.forCode(data.errorCode()));
     }
 
-    public static ListClientMetricsResourcesResponse parse(Readable readable, 
short version) {
-        return new ListClientMetricsResourcesResponse(new 
ListClientMetricsResourcesResponseData(
+    public static ListConfigResourcesResponse parse(Readable readable, short 
version) {
+        return new ListConfigResourcesResponse(new 
ListConfigResourcesResponseData(
             readable, version));
     }
 
@@ -67,10 +68,22 @@ public class ListClientMetricsResourcesResponse extends 
AbstractResponse {
         data.setThrottleTimeMs(throttleTimeMs);
     }
 
+    public Collection<ConfigResource> configResources() {
+        return data.configResources()
+            .stream()
+            .map(entry ->
+                new ConfigResource(
+                    ConfigResource.Type.forId(entry.resourceType()),
+                    entry.resourceName()
+                )
+            ).collect(Collectors.toList());
+    }
+
     public Collection<ClientMetricsResourceListing> clientMetricsResources() {
-        return data.clientMetricsResources()
+        return data.configResources()
             .stream()
-            .map(entry -> new ClientMetricsResourceListing(entry.name()))
+            .filter(entry -> entry.resourceType() == 
ConfigResource.Type.CLIENT_METRICS.id())
+            .map(entry -> new 
ClientMetricsResourceListing(entry.resourceName()))
             .collect(Collectors.toList());
     }
 }
diff --git 
a/clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json
 b/clients/src/main/resources/common/message/ListConfigResourcesRequest.json
similarity index 64%
rename from 
clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json
rename to 
clients/src/main/resources/common/message/ListConfigResourcesRequest.json
index b54dce6b7c7..c4b858a7150 100644
--- 
a/clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json
+++ b/clients/src/main/resources/common/message/ListConfigResourcesRequest.json
@@ -17,10 +17,15 @@
   "apiKey": 74,
   "type": "request",
   "listeners": ["broker"],
-  "name": "ListClientMetricsResourcesRequest",
-  "validVersions": "0",
+  "name": "ListConfigResourcesRequest",
+  // Version 0 is used as ListClientMetricsResourcesRequest which only lists 
client metrics resources.
+  // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified 
ResourceTypes, it should return all configuration resources.
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
+    { "name": "ResourceTypes", "type": "[]int8", "versions": "1+",
+      "about": "The list of resource type. If the list is empty, it uses 
default supported config resource types."
+    }
   ]
 }
   
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json
 b/clients/src/main/resources/common/message/ListConfigResourcesResponse.json
similarity index 66%
rename from 
clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json
rename to 
clients/src/main/resources/common/message/ListConfigResourcesResponse.json
index 281781c7627..8a2dbdf5a30 100644
--- 
a/clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json
+++ b/clients/src/main/resources/common/message/ListConfigResourcesResponse.json
@@ -16,18 +16,22 @@
 {
   "apiKey": 74,
   "type": "response",
-  "name": "ListClientMetricsResourcesResponse",
-  "validVersions": "0",
+  "name": "ListConfigResourcesResponse",
+  // Version 0 is used as ListClientMetricsResourcesResponse which returns all 
client metrics resources.
+  // Version 1 adds ResourceType to ConfigResources (KIP-1142).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
       { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
         "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The error code, or 0 if there was no error." },
-      { "name": "ClientMetricsResources", "type": "[]ClientMetricsResource", 
"versions": "0+",
-        "about": "Each client metrics resource in the response.", "fields": [
-        { "name": "Name", "type": "string", "versions": "0+",
-          "about": "The resource name." }
+      { "name": "ConfigResources", "type": "[]ConfigResource", "versions": 
"0+",
+        "about": "Each config resource in the response.", "fields": [
+        { "name": "ResourceName", "type": "string", "versions": "0+",
+          "about": "The resource name." },
+        { "name": "ResourceType", "type": "int8", "versions": "1+", 
"ignorable": true, "default": 16,
+          "about": "The resource type." }
     ]}
   ]
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3df76fffd36..83356b68ff5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -141,7 +141,7 @@ import 
org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
@@ -225,8 +225,8 @@ import 
org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListConfigResourcesRequest;
+import org.apache.kafka.common.requests.ListConfigResourcesResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -10672,17 +10672,25 @@ public class KafkaAdminClientTest {
                 new ClientMetricsResourceListing("two")
             );
 
-            ListClientMetricsResourcesResponseData responseData =
-                new 
ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
+            ListConfigResourcesResponseData responseData =
+                new 
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
 
-            responseData.clientMetricsResources()
-                .add(new 
ListClientMetricsResourcesResponseData.ClientMetricsResource().setName("one"));
-            responseData.clientMetricsResources()
-                .add((new 
ListClientMetricsResourcesResponseData.ClientMetricsResource()).setName("two"));
+            responseData.configResources()
+                .add(new ListConfigResourcesResponseData
+                    .ConfigResource()
+                    .setResourceName("one")
+                    .setResourceType(ConfigResource.Type.CLIENT_METRICS.id())
+                );
+            responseData.configResources()
+                .add(new ListConfigResourcesResponseData
+                    .ConfigResource()
+                    .setResourceName("two")
+                    .setResourceType(ConfigResource.Type.CLIENT_METRICS.id())
+                );
 
             env.kafkaClient().prepareResponse(
-                request -> request instanceof 
ListClientMetricsResourcesRequest,
-                new ListClientMetricsResourcesResponse(responseData));
+                request -> request instanceof ListConfigResourcesRequest,
+                new ListConfigResourcesResponse(responseData));
 
             ListClientMetricsResourcesResult result = 
env.adminClient().listClientMetricsResources();
             assertEquals(new HashSet<>(expected), new 
HashSet<>(result.all().get()));
@@ -10694,12 +10702,12 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             List<ClientMetricsResourceListing> expected = 
Collections.emptyList();
 
-            ListClientMetricsResourcesResponseData responseData =
-                new 
ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
+            ListConfigResourcesResponseData responseData =
+                new 
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
 
             env.kafkaClient().prepareResponse(
-                request -> request instanceof 
ListClientMetricsResourcesRequest,
-                new ListClientMetricsResourcesResponse(responseData));
+                request -> request instanceof ListConfigResourcesRequest,
+                new ListConfigResourcesResponse(responseData));
 
             ListClientMetricsResourcesResult result = 
env.adminClient().listClientMetricsResources();
             assertEquals(new HashSet<>(expected), new 
HashSet<>(result.all().get()));
@@ -10710,7 +10718,7 @@ public class KafkaAdminClientTest {
     public void testListClientMetricsResourcesNotSupported() {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().prepareResponse(
-                request -> request instanceof 
ListClientMetricsResourcesRequest,
+                request -> request instanceof ListConfigResourcesRequest,
                 
prepareListClientMetricsResourcesResponse(Errors.UNSUPPORTED_VERSION));
 
             ListClientMetricsResourcesResult result = 
env.adminClient().listClientMetricsResources();
@@ -10776,8 +10784,8 @@ public class KafkaAdminClientTest {
         }
     }
 
-    private static ListClientMetricsResourcesResponse 
prepareListClientMetricsResourcesResponse(Errors error) {
-        return new ListClientMetricsResourcesResponse(new 
ListClientMetricsResourcesResponseData()
+    private static ListConfigResourcesResponse 
prepareListClientMetricsResourcesResponse(Errors error) {
+        return new ListConfigResourcesResponse(new 
ListConfigResourcesResponseData()
                 .setErrorCode(error.code()));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e7f503f6ceb..aa50f9db018 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -175,8 +175,8 @@ import 
org.apache.kafka.common.message.JoinGroupResponseData;
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
-import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
+import org.apache.kafka.common.message.ListConfigResourcesRequestData;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
 import org.apache.kafka.common.message.ListGroupsRequestData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
@@ -1052,7 +1052,7 @@ public class RequestResponseTest {
             case GET_TELEMETRY_SUBSCRIPTIONS: return 
createGetTelemetrySubscriptionsRequest(version);
             case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
             case ASSIGN_REPLICAS_TO_DIRS: return 
createAssignReplicasToDirsRequest(version);
-            case LIST_CLIENT_METRICS_RESOURCES: return 
createListClientMetricsResourcesRequest(version);
+            case LIST_CONFIG_RESOURCES: return 
createListConfigResourcesRequest(version);
             case DESCRIBE_TOPIC_PARTITIONS: return 
createDescribeTopicPartitionsRequest(version);
             case SHARE_GROUP_HEARTBEAT: return 
createShareGroupHeartbeatRequest(version);
             case SHARE_GROUP_DESCRIBE: return 
createShareGroupDescribeRequest(version);
@@ -1147,7 +1147,7 @@ public class RequestResponseTest {
             case GET_TELEMETRY_SUBSCRIPTIONS: return 
createGetTelemetrySubscriptionsResponse();
             case PUSH_TELEMETRY: return createPushTelemetryResponse();
             case ASSIGN_REPLICAS_TO_DIRS: return 
createAssignReplicasToDirsResponse();
-            case LIST_CLIENT_METRICS_RESOURCES: return 
createListClientMetricsResourcesResponse();
+            case LIST_CONFIG_RESOURCES: return 
createListConfigResourcesResponse();
             case DESCRIBE_TOPIC_PARTITIONS: return 
createDescribeTopicPartitionsResponse();
             case SHARE_GROUP_HEARTBEAT: return 
createShareGroupHeartbeatResponse();
             case SHARE_GROUP_DESCRIBE: return 
createShareGroupDescribeResponse();
@@ -3636,15 +3636,15 @@ public class RequestResponseTest {
         return new PushTelemetryResponse(response);
     }
 
-    private ListClientMetricsResourcesRequest 
createListClientMetricsResourcesRequest(short version) {
-        return new ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build(version);
+    private ListConfigResourcesRequest createListConfigResourcesRequest(short 
version) {
+        return new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build(version);
     }
 
-    private ListClientMetricsResourcesResponse 
createListClientMetricsResourcesResponse() {
-        ListClientMetricsResourcesResponseData response = new 
ListClientMetricsResourcesResponseData();
+    private ListConfigResourcesResponse createListConfigResourcesResponse() {
+        ListConfigResourcesResponseData response = new 
ListConfigResourcesResponseData();
         response.setErrorCode(Errors.NONE.code());
         response.setThrottleTimeMs(10);
-        return new ListClientMetricsResourcesResponse(response);
+        return new ListConfigResourcesResponse(response);
     }
 
     private InitializeShareGroupStateRequest 
createInitializeShareGroupStateRequest(short version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b9459b875e2..ad19902ea79 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.EndpointType
 import org.apache.kafka.common.acl.AclOperation
 import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic}
@@ -34,7 +35,6 @@ import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartit
 import 
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
 DeleteRecordsTopicResult}
 import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
 import 
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
-import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
 ListOffsetsTopicResponse}
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
@@ -227,7 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => 
handleDescribeTopicPartitionsRequest(request)
         case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => 
handleGetTelemetrySubscriptionsRequest(request)
         case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
-        case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => 
handleListClientMetricsResources(request)
+        case ApiKeys.LIST_CONFIG_RESOURCES => 
handleListConfigResources(request)
         case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
         case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
         case ApiKeys.SHARE_GROUP_HEARTBEAT => 
handleShareGroupHeartbeat(request).exceptionally(handleError)
@@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
-    val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
+  /**
+   * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it 
uses ListConfigResourcesRequest#supportedResourceTypes
+   * to retrieve config resources. If resourceTypes are specified, it returns 
matched config resources.
+   * If a config resource type is not supported, the handler returns 
UNSUPPORTED_VERSION.
+   */
+  private def handleListConfigResources(request: RequestChannel.Request): Unit 
= {
+    val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
 
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
-      requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+      requestHelper.sendMaybeThrottle(request, 
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-        clientMetricsManager.listClientMetricsResources.stream.map(
-          name => new 
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
-      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+      val data = new ListConfigResourcesResponseData()
+
+      val supportedResourceTypes = 
listConfigResourcesRequest.supportedResourceTypes()
+      var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+      if (resourceTypes.isEmpty) {
+        resourceTypes = supportedResourceTypes.stream().toList
+      }
+
+      resourceTypes.forEach(resourceType =>
+        if (!supportedResourceTypes.contains(resourceType)) {
+          requestHelper.sendMaybeThrottle(request, new 
ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
+          return
+        }
+      )
+
+      val result = new 
util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
+      if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
+        groupConfigManager.groupIds().forEach(id =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
+        clientMetricsManager.listClientMetricsResources.forEach(name =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
+        
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
+        
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
+        metadataCache.getAllTopics.forEach(name =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.TOPIC.id))
+        )
+      }
+      data.setConfigResources(result)
+      requestHelper.sendMaybeThrottle(request, new 
ListConfigResourcesResponse(data))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 045e1755006..f698c8ef9ef 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -51,7 +51,6 @@ import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{De
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
 => IAlterConfigsResource, AlterConfigsResourceCollection => 
IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, 
AlterableConfigCollection => IAlterableConfigCollection}
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
 => IAlterConfigsResourceResponse}
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
-import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
 import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
 import 
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
 ListOffsetsTopicResponse}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
@@ -11207,47 +11206,235 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testListClientMetricsResources(): Unit = {
-    val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
-    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+  def testListConfigResourcesV0(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build(0))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val resources = util.Set.of("client-metric1", "client-metric2")
+    when(clientMetricsManager.listClientMetricsResources).thenReturn(resources)
 
-    val resources = new mutable.HashSet[String]
-    resources.add("test1")
-    resources.add("test2")
-    
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
     kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
-    val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
-    val expectedResponse = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-      resources.map(resource => new 
ClientMetricsResource().setName(resource)).toBuffer.asJava)
-    assertEquals(expectedResponse, response.data)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          resources.stream.map(resource =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource)
+          
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+
+    verify(metadataCache, never).getAllTopics
+    verify(groupConfigManager, never).groupIds
+    verify(metadataCache, never).getBrokerNodes(any)
   }
 
   @Test
-  def testListClientMetricsResourcesEmptyResponse(): Unit = {
-    val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
-    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+  def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val clientMetrics = util.Set.of("client-metric1", "client-metric2")
+    val topics = util.Set.of("topic1", "topic2")
+    val groupIds = util.List.of("group1", "group2")
+    val nodeIds = util.List.of(1, 2)
+    
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
+    when(metadataCache.getAllTopics).thenReturn(topics)
+    when(groupConfigManager.groupIds).thenReturn(groupIds)
+    when(metadataCache.getBrokerNodes(any())).thenReturn(
+      nodeIds.stream().map(id => new Node(id, "localhost", 
1234)).collect(java.util.stream.Collectors.toList()))
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          util.stream.Stream.of(
+            groupIds.stream().map(resource =>
+              new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id)
+            ).toList,
+            clientMetrics.stream.map(resource =>
+              new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
+            ).toList,
+            nodeIds.stream().map(resource =>
+              new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)
+            ).toList,
+            nodeIds.stream().map(resource =>
+              new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id)
+            ).toList,
+            topics.stream().map(resource =>
+              new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
+            ).toList
+          ).flatMap(s => 
s.stream).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+  }
+
+  @Test
+  def testListConfigResourcesV1WithGroup(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+      .setResourceTypes(util.List.of(ConfigResource.Type.GROUP.id))).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = util.List.of("group1", "group2")
+    when(groupConfigManager.groupIds).thenReturn(groupIds)
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          groupIds.stream().map(resource =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id)
+          
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+
+    verify(metadataCache, never).getAllTopics
+    verify(clientMetricsManager, never).listClientMetricsResources
+    verify(metadataCache, never).getBrokerNodes(any)
+  }
+
+  @Test
+  def testListConfigResourcesV1WithClientMetrics(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+      
.setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val clientMetrics = util.Set.of("client-metric1", "client-metric2")
+    
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          clientMetrics.stream.map(resource =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
+          
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+
+    verify(metadataCache, never).getAllTopics
+    verify(groupConfigManager, never).groupIds
+    verify(metadataCache, never).getBrokerNodes(any)
+  }
+
+  @Test
+  def testListConfigResourcesV1WithBrokerLogger(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+      
.setResourceTypes(util.List.of(ConfigResource.Type.BROKER_LOGGER.id))).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val nodeIds = util.List.of(1, 2)
+    when(metadataCache.getBrokerNodes(any())).thenReturn(
+      nodeIds.stream().map(id => new Node(id, "localhost", 
1234)).collect(java.util.stream.Collectors.toList()))
 
-    val resources = new mutable.HashSet[String]
-    
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
     kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
-    val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
-    val expectedResponse = new ListClientMetricsResourcesResponseData()
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          nodeIds.stream().map(resource =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)
+          
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+
+    verify(metadataCache, never).getAllTopics
+    verify(groupConfigManager, never).groupIds
+    verify(clientMetricsManager, never).listClientMetricsResources
+  }
+
+  @Test
+  def testListConfigResourcesV1WithBroker(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+      .setResourceTypes(util.List.of(ConfigResource.Type.BROKER.id))).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val nodeIds = util.List.of(1, 2)
+    when(metadataCache.getBrokerNodes(any())).thenReturn(
+      nodeIds.stream().map(id => new Node(id, "localhost", 
1234)).collect(java.util.stream.Collectors.toList()))
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          nodeIds.stream().map(resource =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id)
+          
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+
+    verify(metadataCache, never).getAllTopics
+    verify(groupConfigManager, never).groupIds
+    verify(clientMetricsManager, never).listClientMetricsResources
+  }
+
+  @Test
+  def testListConfigResourcesV1WithTopic(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+      .setResourceTypes(util.List.of(ConfigResource.Type.TOPIC.id))).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val topics = util.Set.of("topic1", "topic2")
+    when(metadataCache.getAllTopics).thenReturn(topics)
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponseData = new ListConfigResourcesResponseData()
+        .setConfigResources(
+          topics.stream().map(resource =>
+            new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
+          
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+    assertEquals(expectedResponseData, response.data)
+
+    verify(groupConfigManager, never).groupIds
+    verify(clientMetricsManager, never).listClientMetricsResources
+    verify(metadataCache, never).getBrokerNodes(any)
+  }
+
+  @Test
+  def testListConfigResourcesEmptyResponse(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build())
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    
when(clientMetricsManager.listClientMetricsResources).thenReturn(util.Set.of)
+    when(metadataCache.getAllTopics).thenReturn(util.Set.of)
+    when(groupConfigManager.groupIds).thenReturn(util.List.of)
+    when(metadataCache.getBrokerNodes(any())).thenReturn(util.List.of)
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    val expectedResponse = new ListConfigResourcesResponseData()
     assertEquals(expectedResponse, response.data)
   }
 
   @Test
-  def testListClientMetricsResourcesWithException(): Unit = {
-    val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
+  def testListConfigResourcesV1WithUnknown(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+      
.setResourceTypes(util.List.of(ConfigResource.Type.UNKNOWN.id))).build(1))
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(request, RequestLocal.noCaching)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+    assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data.errorCode())
+
+    verify(metadataCache, never).getAllTopics
+    verify(groupConfigManager, never).groupIds
+    verify(clientMetricsManager, never).listClientMetricsResources
+    verify(metadataCache, never).getBrokerNodes(any)
+  }
+
+  @Test
+  def testListConfigResourcesWithException(): Unit = {
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build())
     metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
 
     when(clientMetricsManager.listClientMetricsResources).thenThrow(new 
RuntimeException("test"))
     kafkaApis = createKafkaApis()
     kafkaApis.handle(request, RequestLocal.noCaching)
-    val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
+    val response = verifyNoThrottling[ListConfigResourcesResponse](request)
 
-    val expectedResponse = new 
ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
+    val expectedResponse = new 
ListConfigResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
     assertEquals(expectedResponse, response.data)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c55f55fad3e..db03c891e4c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -641,8 +641,8 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.ASSIGN_REPLICAS_TO_DIRS =>
           new AssignReplicasToDirsRequest.Builder(new 
AssignReplicasToDirsRequestData())
 
-        case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
-          new ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData())
+        case ApiKeys.LIST_CONFIG_RESOURCES =>
+          new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData())
 
         case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
           new DescribeTopicPartitionsRequest.Builder(new 
DescribeTopicPartitionsRequestData())
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 87a45110cd1..80ef0d24a4a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -68,6 +69,10 @@ public class GroupConfigManager implements AutoCloseable {
         return Optional.ofNullable(configMap.get(groupId));
     }
 
+    public List<String> groupIds() {
+        return List.copyOf(configMap.keySet());
+    }
+
     /**
      * Validate the given properties.
      *
diff --git 
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java 
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index 56914f2fe46..e2a76e5caba 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -130,8 +130,8 @@ import 
org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter;
 import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter;
 import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter;
 import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter;
-import 
org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter;
-import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ListConfigResourcesRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ListConfigResourcesResponseDataJsonConverter;
 import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter;
 import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter;
 import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter;
@@ -312,8 +312,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListConfigResourcesRequest;
+import org.apache.kafka.common.requests.ListConfigResourcesResponse;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -507,8 +507,8 @@ public class RequestConvertToJson {
                 return 
JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), 
request.version());
             case LEAVE_GROUP:
                 return 
LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), 
request.version());
-            case LIST_CLIENT_METRICS_RESOURCES:
-                return 
ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest)
 request).data(), request.version());
+            case LIST_CONFIG_RESOURCES:
+                return 
ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest) 
request).data(), request.version());
             case LIST_GROUPS:
                 return 
ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), 
request.version());
             case LIST_OFFSETS:
@@ -693,8 +693,8 @@ public class RequestConvertToJson {
                 return 
JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), 
version);
             case LEAVE_GROUP:
                 return 
LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) 
response).data(), version);
-            case LIST_CLIENT_METRICS_RESOURCES:
-                return 
ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse)
 response).data(), version);
+            case LIST_CONFIG_RESOURCES:
+                return 
ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse)
 response).data(), version);
             case LIST_GROUPS:
                 return 
ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) 
response).data(), version);
             case LIST_OFFSETS:

Reply via email to