This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c26b09c6092 KAFKA-18904: [1/N] Change ListClientMetricsResources API
to ListConfigResources (#19493)
c26b09c6092 is described below
commit c26b09c6092837c47795d645526d3d098d0bd07e
Author: PoAn Yang <[email protected]>
AuthorDate: Thu May 15 17:39:00 2025 -0500
KAFKA-18904: [1/N] Change ListClientMetricsResources API to
ListConfigResources (#19493)
* Change `ListClientMetricsResourcesRequest.json` to
`ListConfigResourcesRequest.json`.
* Change `ListClientMetricsResourcesResponse.json` to
`ListConfigResourcesResponse.json`.
* Change `ListClientMetricsResourcesRequest.java` to
`ListConfigResourcesRequest.java`.
* Change `ListClientMetricsResourcesResponse.java` to
`ListConfigResourcesResponsejava`.
* Change `KafkaApis` to handle both `ListClientMetricsResourcesRequest`
v0 and v1 requests.
Reviewers: Andrew Schofield <[email protected]>
---------
Signed-off-by: PoAn Yang <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 15 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 2 +-
.../kafka/common/requests/AbstractRequest.java | 4 +-
.../kafka/common/requests/AbstractResponse.java | 4 +-
.../ListClientMetricsResourcesRequest.java | 75 -------
.../requests/ListConfigResourcesRequest.java | 95 +++++++++
...ponse.java => ListConfigResourcesResponse.java} | 33 ++-
...equest.json => ListConfigResourcesRequest.json} | 9 +-
...ponse.json => ListConfigResourcesResponse.json} | 16 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 44 ++--
.../kafka/common/requests/RequestResponseTest.java | 18 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 62 +++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 233 +++++++++++++++++++--
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
.../coordinator/group/GroupConfigManager.java | 5 +
.../apache/kafka/network/RequestConvertToJson.java | 16 +-
16 files changed, 462 insertions(+), 173 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 87a951fe26e..0abad889c85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -159,7 +159,7 @@ import
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.U
import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
-import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
+import org.apache.kafka.common.message.ListConfigResourcesRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
@@ -233,8 +233,8 @@ import
org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListConfigResourcesRequest;
+import org.apache.kafka.common.requests.ListConfigResourcesResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -4887,13 +4887,16 @@ public class KafkaAdminClient extends AdminClient {
new LeastLoadedNodeProvider()) {
@Override
- ListClientMetricsResourcesRequest.Builder createRequest(int
timeoutMs) {
- return new ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData());
+ ListConfigResourcesRequest.Builder createRequest(int timeoutMs) {
+ return new ListConfigResourcesRequest.Builder(
+ new ListConfigResourcesRequestData()
+
.setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))
+ );
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
- ListClientMetricsResourcesResponse response =
(ListClientMetricsResourcesResponse) abstractResponse;
+ ListConfigResourcesResponse response =
(ListConfigResourcesResponse) abstractResponse;
if (response.error().isFailure()) {
future.completeExceptionally(response.error().exception());
} else {
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 8534561af8b..89b952e6ce7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -116,7 +116,7 @@ public enum ApiKeys {
GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
-
LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
+ LIST_CONFIG_RESOURCES(ApiMessageType.LIST_CONFIG_RESOURCES),
DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS),
SHARE_GROUP_HEARTBEAT(ApiMessageType.SHARE_GROUP_HEARTBEAT),
SHARE_GROUP_DESCRIBE(ApiMessageType.SHARE_GROUP_DESCRIBE),
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 9738711e73a..750de2050f4 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -316,8 +316,8 @@ public abstract class AbstractRequest implements
AbstractRequestResponse {
return PushTelemetryRequest.parse(readable, apiVersion);
case ASSIGN_REPLICAS_TO_DIRS:
return AssignReplicasToDirsRequest.parse(readable, apiVersion);
- case LIST_CLIENT_METRICS_RESOURCES:
- return ListClientMetricsResourcesRequest.parse(readable,
apiVersion);
+ case LIST_CONFIG_RESOURCES:
+ return ListConfigResourcesRequest.parse(readable, apiVersion);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsRequest.parse(readable,
apiVersion);
case SHARE_GROUP_HEARTBEAT:
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 01cc03c12b6..bc313078d74 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -253,8 +253,8 @@ public abstract class AbstractResponse implements
AbstractRequestResponse {
return PushTelemetryResponse.parse(readable, version);
case ASSIGN_REPLICAS_TO_DIRS:
return AssignReplicasToDirsResponse.parse(readable, version);
- case LIST_CLIENT_METRICS_RESOURCES:
- return ListClientMetricsResourcesResponse.parse(readable,
version);
+ case LIST_CONFIG_RESOURCES:
+ return ListConfigResourcesResponse.parse(readable, version);
case DESCRIBE_TOPIC_PARTITIONS:
return DescribeTopicPartitionsResponse.parse(readable,
version);
case SHARE_GROUP_HEARTBEAT:
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java
deleted file mode 100644
index 417740d0ffa..00000000000
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesRequest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.Readable;
-
-public class ListClientMetricsResourcesRequest extends AbstractRequest {
- public static class Builder extends
AbstractRequest.Builder<ListClientMetricsResourcesRequest> {
- public final ListClientMetricsResourcesRequestData data;
-
- public Builder(ListClientMetricsResourcesRequestData data) {
- super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES);
- this.data = data;
- }
-
- @Override
- public ListClientMetricsResourcesRequest build(short version) {
- return new ListClientMetricsResourcesRequest(data, version);
- }
-
- @Override
- public String toString() {
- return data.toString();
- }
- }
-
- private final ListClientMetricsResourcesRequestData data;
-
- private
ListClientMetricsResourcesRequest(ListClientMetricsResourcesRequestData data,
short version) {
- super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES, version);
- this.data = data;
- }
-
- public ListClientMetricsResourcesRequestData data() {
- return data;
- }
-
- @Override
- public ListClientMetricsResourcesResponse getErrorResponse(int
throttleTimeMs, Throwable e) {
- Errors error = Errors.forException(e);
- ListClientMetricsResourcesResponseData response = new
ListClientMetricsResourcesResponseData()
- .setErrorCode(error.code())
- .setThrottleTimeMs(throttleTimeMs);
- return new ListClientMetricsResourcesResponse(response);
- }
-
- public static ListClientMetricsResourcesRequest parse(Readable readable,
short version) {
- return new ListClientMetricsResourcesRequest(new
ListClientMetricsResourcesRequestData(
- readable, version), version);
- }
-
- @Override
- public String toString(boolean verbose) {
- return data.toString();
- }
-
-}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
new file mode 100644
index 00000000000..3af70938843
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.ListConfigResourcesRequestData;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.Readable;
+
+import java.util.Set;
+
+public class ListConfigResourcesRequest extends AbstractRequest {
+ public static class Builder extends
AbstractRequest.Builder<ListConfigResourcesRequest> {
+ public final ListConfigResourcesRequestData data;
+
+ public Builder(ListConfigResourcesRequestData data) {
+ super(ApiKeys.LIST_CONFIG_RESOURCES);
+ this.data = data;
+ }
+
+ @Override
+ public ListConfigResourcesRequest build(short version) {
+ return new ListConfigResourcesRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final ListConfigResourcesRequestData data;
+
+ private ListConfigResourcesRequest(ListConfigResourcesRequestData data,
short version) {
+ super(ApiKeys.LIST_CONFIG_RESOURCES, version);
+ this.data = data;
+ }
+
+ public ListConfigResourcesRequestData data() {
+ return data;
+ }
+
+ @Override
+ public ListConfigResourcesResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
+ Errors error = Errors.forException(e);
+ ListConfigResourcesResponseData response = new
ListConfigResourcesResponseData()
+ .setErrorCode(error.code())
+ .setThrottleTimeMs(throttleTimeMs);
+ return new ListConfigResourcesResponse(response);
+ }
+
+ public static ListConfigResourcesRequest parse(Readable readable, short
version) {
+ return new ListConfigResourcesRequest(new
ListConfigResourcesRequestData(
+ readable, version), version);
+ }
+
+ @Override
+ public String toString(boolean verbose) {
+ return data.toString();
+ }
+
+ /**
+ * Return the supported config resource types in different request version.
+ * If there is a new config resource type, the ListConfigResourcesRequest
should bump a new request version to include it.
+ * For v0, the supported config resource types contain CLIENT_METRICS (16).
+ * For v1, the supported config resource types contain TOPIC (2), BROKER
(4), BROKER_LOGGER (8), CLIENT_METRICS (16), and GROUP (32).
+ */
+ public Set<Byte> supportedResourceTypes() {
+ return version() == 0 ?
+ Set.of(ConfigResource.Type.CLIENT_METRICS.id()) :
+ Set.of(
+ ConfigResource.Type.TOPIC.id(),
+ ConfigResource.Type.BROKER.id(),
+ ConfigResource.Type.BROKER_LOGGER.id(),
+ ConfigResource.Type.CLIENT_METRICS.id(),
+ ConfigResource.Type.GROUP.id()
+ );
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
similarity index 64%
rename from
clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
rename to
clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
index c2b3b1601ed..36a4a807f7f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListClientMetricsResourcesResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
@@ -17,7 +17,8 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
@@ -26,15 +27,15 @@ import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
-public class ListClientMetricsResourcesResponse extends AbstractResponse {
- private final ListClientMetricsResourcesResponseData data;
+public class ListConfigResourcesResponse extends AbstractResponse {
+ private final ListConfigResourcesResponseData data;
- public
ListClientMetricsResourcesResponse(ListClientMetricsResourcesResponseData data)
{
- super(ApiKeys.LIST_CLIENT_METRICS_RESOURCES);
+ public ListConfigResourcesResponse(ListConfigResourcesResponseData data) {
+ super(ApiKeys.LIST_CONFIG_RESOURCES);
this.data = data;
}
- public ListClientMetricsResourcesResponseData data() {
+ public ListConfigResourcesResponseData data() {
return data;
}
@@ -47,8 +48,8 @@ public class ListClientMetricsResourcesResponse extends
AbstractResponse {
return errorCounts(Errors.forCode(data.errorCode()));
}
- public static ListClientMetricsResourcesResponse parse(Readable readable,
short version) {
- return new ListClientMetricsResourcesResponse(new
ListClientMetricsResourcesResponseData(
+ public static ListConfigResourcesResponse parse(Readable readable, short
version) {
+ return new ListConfigResourcesResponse(new
ListConfigResourcesResponseData(
readable, version));
}
@@ -67,10 +68,22 @@ public class ListClientMetricsResourcesResponse extends
AbstractResponse {
data.setThrottleTimeMs(throttleTimeMs);
}
+ public Collection<ConfigResource> configResources() {
+ return data.configResources()
+ .stream()
+ .map(entry ->
+ new ConfigResource(
+ ConfigResource.Type.forId(entry.resourceType()),
+ entry.resourceName()
+ )
+ ).collect(Collectors.toList());
+ }
+
public Collection<ClientMetricsResourceListing> clientMetricsResources() {
- return data.clientMetricsResources()
+ return data.configResources()
.stream()
- .map(entry -> new ClientMetricsResourceListing(entry.name()))
+ .filter(entry -> entry.resourceType() ==
ConfigResource.Type.CLIENT_METRICS.id())
+ .map(entry -> new
ClientMetricsResourceListing(entry.resourceName()))
.collect(Collectors.toList());
}
}
diff --git
a/clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json
b/clients/src/main/resources/common/message/ListConfigResourcesRequest.json
similarity index 64%
rename from
clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json
rename to
clients/src/main/resources/common/message/ListConfigResourcesRequest.json
index b54dce6b7c7..c4b858a7150 100644
---
a/clients/src/main/resources/common/message/ListClientMetricsResourcesRequest.json
+++ b/clients/src/main/resources/common/message/ListConfigResourcesRequest.json
@@ -17,10 +17,15 @@
"apiKey": 74,
"type": "request",
"listeners": ["broker"],
- "name": "ListClientMetricsResourcesRequest",
- "validVersions": "0",
+ "name": "ListConfigResourcesRequest",
+ // Version 0 is used as ListClientMetricsResourcesRequest which only lists
client metrics resources.
+ // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified
ResourceTypes, it should return all configuration resources.
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
+ { "name": "ResourceTypes", "type": "[]int8", "versions": "1+",
+ "about": "The list of resource type. If the list is empty, it uses
default supported config resource types."
+ }
]
}
\ No newline at end of file
diff --git
a/clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json
b/clients/src/main/resources/common/message/ListConfigResourcesResponse.json
similarity index 66%
rename from
clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json
rename to
clients/src/main/resources/common/message/ListConfigResourcesResponse.json
index 281781c7627..8a2dbdf5a30 100644
---
a/clients/src/main/resources/common/message/ListClientMetricsResourcesResponse.json
+++ b/clients/src/main/resources/common/message/ListConfigResourcesResponse.json
@@ -16,18 +16,22 @@
{
"apiKey": 74,
"type": "response",
- "name": "ListClientMetricsResourcesResponse",
- "validVersions": "0",
+ "name": "ListConfigResourcesResponse",
+ // Version 0 is used as ListClientMetricsResourcesResponse which returns all
client metrics resources.
+ // Version 1 adds ResourceType to ConfigResources (KIP-1142).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
- { "name": "ClientMetricsResources", "type": "[]ClientMetricsResource",
"versions": "0+",
- "about": "Each client metrics resource in the response.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+",
- "about": "The resource name." }
+ { "name": "ConfigResources", "type": "[]ConfigResource", "versions":
"0+",
+ "about": "Each config resource in the response.", "fields": [
+ { "name": "ResourceName", "type": "string", "versions": "0+",
+ "about": "The resource name." },
+ { "name": "ResourceType", "type": "int8", "versions": "1+",
"ignorable": true, "default": 16,
+ "about": "The resource type." }
]}
]
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3df76fffd36..83356b68ff5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -141,7 +141,7 @@ import
org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
import org.apache.kafka.common.message.ListOffsetsResponseData;
@@ -225,8 +225,8 @@ import
org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListConfigResourcesRequest;
+import org.apache.kafka.common.requests.ListConfigResourcesResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -10672,17 +10672,25 @@ public class KafkaAdminClientTest {
new ClientMetricsResourceListing("two")
);
- ListClientMetricsResourcesResponseData responseData =
- new
ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
+ ListConfigResourcesResponseData responseData =
+ new
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
- responseData.clientMetricsResources()
- .add(new
ListClientMetricsResourcesResponseData.ClientMetricsResource().setName("one"));
- responseData.clientMetricsResources()
- .add((new
ListClientMetricsResourcesResponseData.ClientMetricsResource()).setName("two"));
+ responseData.configResources()
+ .add(new ListConfigResourcesResponseData
+ .ConfigResource()
+ .setResourceName("one")
+ .setResourceType(ConfigResource.Type.CLIENT_METRICS.id())
+ );
+ responseData.configResources()
+ .add(new ListConfigResourcesResponseData
+ .ConfigResource()
+ .setResourceName("two")
+ .setResourceType(ConfigResource.Type.CLIENT_METRICS.id())
+ );
env.kafkaClient().prepareResponse(
- request -> request instanceof
ListClientMetricsResourcesRequest,
- new ListClientMetricsResourcesResponse(responseData));
+ request -> request instanceof ListConfigResourcesRequest,
+ new ListConfigResourcesResponse(responseData));
ListClientMetricsResourcesResult result =
env.adminClient().listClientMetricsResources();
assertEquals(new HashSet<>(expected), new
HashSet<>(result.all().get()));
@@ -10694,12 +10702,12 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
List<ClientMetricsResourceListing> expected =
Collections.emptyList();
- ListClientMetricsResourcesResponseData responseData =
- new
ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code());
+ ListConfigResourcesResponseData responseData =
+ new
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
env.kafkaClient().prepareResponse(
- request -> request instanceof
ListClientMetricsResourcesRequest,
- new ListClientMetricsResourcesResponse(responseData));
+ request -> request instanceof ListConfigResourcesRequest,
+ new ListConfigResourcesResponse(responseData));
ListClientMetricsResourcesResult result =
env.adminClient().listClientMetricsResources();
assertEquals(new HashSet<>(expected), new
HashSet<>(result.all().get()));
@@ -10710,7 +10718,7 @@ public class KafkaAdminClientTest {
public void testListClientMetricsResourcesNotSupported() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().prepareResponse(
- request -> request instanceof
ListClientMetricsResourcesRequest,
+ request -> request instanceof ListConfigResourcesRequest,
prepareListClientMetricsResourcesResponse(Errors.UNSUPPORTED_VERSION));
ListClientMetricsResourcesResult result =
env.adminClient().listClientMetricsResources();
@@ -10776,8 +10784,8 @@ public class KafkaAdminClientTest {
}
}
- private static ListClientMetricsResourcesResponse
prepareListClientMetricsResourcesResponse(Errors error) {
- return new ListClientMetricsResourcesResponse(new
ListClientMetricsResourcesResponseData()
+ private static ListConfigResourcesResponse
prepareListClientMetricsResourcesResponse(Errors error) {
+ return new ListConfigResourcesResponse(new
ListConfigResourcesResponseData()
.setErrorCode(error.code()));
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e7f503f6ceb..aa50f9db018 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -175,8 +175,8 @@ import
org.apache.kafka.common.message.JoinGroupResponseData;
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
-import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
-import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
+import org.apache.kafka.common.message.ListConfigResourcesRequestData;
+import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
@@ -1052,7 +1052,7 @@ public class RequestResponseTest {
case GET_TELEMETRY_SUBSCRIPTIONS: return
createGetTelemetrySubscriptionsRequest(version);
case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
case ASSIGN_REPLICAS_TO_DIRS: return
createAssignReplicasToDirsRequest(version);
- case LIST_CLIENT_METRICS_RESOURCES: return
createListClientMetricsResourcesRequest(version);
+ case LIST_CONFIG_RESOURCES: return
createListConfigResourcesRequest(version);
case DESCRIBE_TOPIC_PARTITIONS: return
createDescribeTopicPartitionsRequest(version);
case SHARE_GROUP_HEARTBEAT: return
createShareGroupHeartbeatRequest(version);
case SHARE_GROUP_DESCRIBE: return
createShareGroupDescribeRequest(version);
@@ -1147,7 +1147,7 @@ public class RequestResponseTest {
case GET_TELEMETRY_SUBSCRIPTIONS: return
createGetTelemetrySubscriptionsResponse();
case PUSH_TELEMETRY: return createPushTelemetryResponse();
case ASSIGN_REPLICAS_TO_DIRS: return
createAssignReplicasToDirsResponse();
- case LIST_CLIENT_METRICS_RESOURCES: return
createListClientMetricsResourcesResponse();
+ case LIST_CONFIG_RESOURCES: return
createListConfigResourcesResponse();
case DESCRIBE_TOPIC_PARTITIONS: return
createDescribeTopicPartitionsResponse();
case SHARE_GROUP_HEARTBEAT: return
createShareGroupHeartbeatResponse();
case SHARE_GROUP_DESCRIBE: return
createShareGroupDescribeResponse();
@@ -3636,15 +3636,15 @@ public class RequestResponseTest {
return new PushTelemetryResponse(response);
}
- private ListClientMetricsResourcesRequest
createListClientMetricsResourcesRequest(short version) {
- return new ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build(version);
+ private ListConfigResourcesRequest createListConfigResourcesRequest(short
version) {
+ return new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build(version);
}
- private ListClientMetricsResourcesResponse
createListClientMetricsResourcesResponse() {
- ListClientMetricsResourcesResponseData response = new
ListClientMetricsResourcesResponseData();
+ private ListConfigResourcesResponse createListConfigResourcesResponse() {
+ ListConfigResourcesResponseData response = new
ListConfigResourcesResponseData();
response.setErrorCode(Errors.NONE.code());
response.setThrottleTimeMs(10);
- return new ListClientMetricsResourcesResponse(response);
+ return new ListConfigResourcesResponse(response);
}
private InitializeShareGroupStateRequest
createInitializeShareGroupStateRequest(short version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b9459b875e2..ad19902ea79 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.EndpointType
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.acl.AclOperation._
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.internals.{FatalExitError, Plugin, Topic}
@@ -34,7 +35,6 @@ import
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartit
import
org.apache.kafka.common.message.DeleteRecordsResponseData.{DeleteRecordsPartitionResult,
DeleteRecordsTopicResult}
import
org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic
import
org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic
-import
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition
import
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
ListOffsetsTopicResponse}
import
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
MetadataResponseTopic}
@@ -227,7 +227,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
handleDescribeTopicPartitionsRequest(request)
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS =>
handleGetTelemetrySubscriptionsRequest(request)
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
- case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
handleListClientMetricsResources(request)
+ case ApiKeys.LIST_CONFIG_RESOURCES =>
handleListConfigResources(request)
case ApiKeys.ADD_RAFT_VOTER => forwardToController(request)
case ApiKeys.REMOVE_RAFT_VOTER => forwardToController(request)
case ApiKeys.SHARE_GROUP_HEARTBEAT =>
handleShareGroupHeartbeat(request).exceptionally(handleError)
@@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleListClientMetricsResources(request: RequestChannel.Request): Unit
= {
- val listClientMetricsResourcesRequest =
request.body[ListClientMetricsResourcesRequest]
+ /**
+ * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it
uses ListConfigResourcesRequest#supportedResourceTypes
+ * to retrieve config resources. If resourceTypes are specified, it returns
matched config resources.
+ * If a config resource type is not supported, the handler returns
UNSUPPORTED_VERSION.
+ */
+ private def handleListConfigResources(request: RequestChannel.Request): Unit
= {
+ val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER,
CLUSTER_NAME)) {
- requestHelper.sendMaybeThrottle(request,
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ requestHelper.sendMaybeThrottle(request,
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
- val data = new
ListClientMetricsResourcesResponseData().setClientMetricsResources(
- clientMetricsManager.listClientMetricsResources.stream.map(
- name => new
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
- requestHelper.sendMaybeThrottle(request, new
ListClientMetricsResourcesResponse(data))
+ val data = new ListConfigResourcesResponseData()
+
+ val supportedResourceTypes =
listConfigResourcesRequest.supportedResourceTypes()
+ var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+ if (resourceTypes.isEmpty) {
+ resourceTypes = supportedResourceTypes.stream().toList
+ }
+
+ resourceTypes.forEach(resourceType =>
+ if (!supportedResourceTypes.contains(resourceType)) {
+ requestHelper.sendMaybeThrottle(request, new
ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
+ return
+ }
+ )
+
+ val result = new
util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
+ if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
+ groupConfigManager.groupIds().forEach(id =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
+ clientMetricsManager.listClientMetricsResources.forEach(name =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
+
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
+
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
+ metadataCache.getAllTopics.forEach(name =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.TOPIC.id))
+ )
+ }
+ data.setConfigResources(result)
+ requestHelper.sendMaybeThrottle(request, new
ListConfigResourcesResponse(data))
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 045e1755006..f698c8ef9ef 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -51,7 +51,6 @@ import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.{De
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
=> IAlterConfigsResource, AlterConfigsResourceCollection =>
IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig,
AlterableConfigCollection => IAlterableConfigCollection}
import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
=> IAlterConfigsResourceResponse}
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
-import
org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
import
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
ListOffsetsTopicResponse}
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
@@ -11207,47 +11206,235 @@ class KafkaApisTest extends Logging {
}
@Test
- def testListClientMetricsResources(): Unit = {
- val request = buildRequest(new
ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build())
- metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ def testListConfigResourcesV0(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build(0))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val resources = util.Set.of("client-metric1", "client-metric2")
+ when(clientMetricsManager.listClientMetricsResources).thenReturn(resources)
- val resources = new mutable.HashSet[String]
- resources.add("test1")
- resources.add("test2")
-
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
- val response =
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
- val expectedResponse = new
ListClientMetricsResourcesResponseData().setClientMetricsResources(
- resources.map(resource => new
ClientMetricsResource().setName(resource)).toBuffer.asJava)
- assertEquals(expectedResponse, response.data)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ resources.stream.map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource)
+
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+
+ verify(metadataCache, never).getAllTopics
+ verify(groupConfigManager, never).groupIds
+ verify(metadataCache, never).getBrokerNodes(any)
}
@Test
- def testListClientMetricsResourcesEmptyResponse(): Unit = {
- val request = buildRequest(new
ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build())
- metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ def testListConfigResourcesV1WithEmptyResourceTypes(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val clientMetrics = util.Set.of("client-metric1", "client-metric2")
+ val topics = util.Set.of("topic1", "topic2")
+ val groupIds = util.List.of("group1", "group2")
+ val nodeIds = util.List.of(1, 2)
+
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
+ when(metadataCache.getAllTopics).thenReturn(topics)
+ when(groupConfigManager.groupIds).thenReturn(groupIds)
+ when(metadataCache.getBrokerNodes(any())).thenReturn(
+ nodeIds.stream().map(id => new Node(id, "localhost",
1234)).collect(java.util.stream.Collectors.toList()))
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ util.stream.Stream.of(
+ groupIds.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id)
+ ).toList,
+ clientMetrics.stream.map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
+ ).toList,
+ nodeIds.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)
+ ).toList,
+ nodeIds.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id)
+ ).toList,
+ topics.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
+ ).toList
+ ).flatMap(s =>
s.stream).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+ }
+
+ @Test
+ def testListConfigResourcesV1WithGroup(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+ .setResourceTypes(util.List.of(ConfigResource.Type.GROUP.id))).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val groupIds = util.List.of("group1", "group2")
+ when(groupConfigManager.groupIds).thenReturn(groupIds)
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ groupIds.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.GROUP.id)
+
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+
+ verify(metadataCache, never).getAllTopics
+ verify(clientMetricsManager, never).listClientMetricsResources
+ verify(metadataCache, never).getBrokerNodes(any)
+ }
+
+ @Test
+ def testListConfigResourcesV1WithClientMetrics(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+
.setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val clientMetrics = util.Set.of("client-metric1", "client-metric2")
+
when(clientMetricsManager.listClientMetricsResources).thenReturn(clientMetrics)
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ clientMetrics.stream.map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
+
).collect(util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+
+ verify(metadataCache, never).getAllTopics
+ verify(groupConfigManager, never).groupIds
+ verify(metadataCache, never).getBrokerNodes(any)
+ }
+
+ @Test
+ def testListConfigResourcesV1WithBrokerLogger(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+
.setResourceTypes(util.List.of(ConfigResource.Type.BROKER_LOGGER.id))).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val nodeIds = util.List.of(1, 2)
+ when(metadataCache.getBrokerNodes(any())).thenReturn(
+ nodeIds.stream().map(id => new Node(id, "localhost",
1234)).collect(java.util.stream.Collectors.toList()))
- val resources = new mutable.HashSet[String]
-
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
- val response =
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
- val expectedResponse = new ListClientMetricsResourcesResponseData()
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ nodeIds.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id)
+
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+
+ verify(metadataCache, never).getAllTopics
+ verify(groupConfigManager, never).groupIds
+ verify(clientMetricsManager, never).listClientMetricsResources
+ }
+
+ @Test
+ def testListConfigResourcesV1WithBroker(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+ .setResourceTypes(util.List.of(ConfigResource.Type.BROKER.id))).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val nodeIds = util.List.of(1, 2)
+ when(metadataCache.getBrokerNodes(any())).thenReturn(
+ nodeIds.stream().map(id => new Node(id, "localhost",
1234)).collect(java.util.stream.Collectors.toList()))
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ nodeIds.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource.toString).setResourceType(ConfigResource.Type.BROKER.id)
+
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+
+ verify(metadataCache, never).getAllTopics
+ verify(groupConfigManager, never).groupIds
+ verify(clientMetricsManager, never).listClientMetricsResources
+ }
+
+ @Test
+ def testListConfigResourcesV1WithTopic(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+ .setResourceTypes(util.List.of(ConfigResource.Type.TOPIC.id))).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ val topics = util.Set.of("topic1", "topic2")
+ when(metadataCache.getAllTopics).thenReturn(topics)
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponseData = new ListConfigResourcesResponseData()
+ .setConfigResources(
+ topics.stream().map(resource =>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(resource).setResourceType(ConfigResource.Type.TOPIC.id)
+
).collect(java.util.stream.Collectors.toList[ListConfigResourcesResponseData.ConfigResource]))
+ assertEquals(expectedResponseData, response.data)
+
+ verify(groupConfigManager, never).groupIds
+ verify(clientMetricsManager, never).listClientMetricsResources
+ verify(metadataCache, never).getBrokerNodes(any)
+ }
+
+ @Test
+ def testListConfigResourcesEmptyResponse(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build())
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+
when(clientMetricsManager.listClientMetricsResources).thenReturn(util.Set.of)
+ when(metadataCache.getAllTopics).thenReturn(util.Set.of)
+ when(groupConfigManager.groupIds).thenReturn(util.List.of)
+ when(metadataCache.getBrokerNodes(any())).thenReturn(util.List.of)
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ val expectedResponse = new ListConfigResourcesResponseData()
assertEquals(expectedResponse, response.data)
}
@Test
- def testListClientMetricsResourcesWithException(): Unit = {
- val request = buildRequest(new
ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build())
+ def testListConfigResourcesV1WithUnknown(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+
.setResourceTypes(util.List.of(ConfigResource.Type.UNKNOWN.id))).build(1))
+ metadataCache = mock(classOf[KRaftMetadataCache])
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(request, RequestLocal.noCaching)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
+ assertEquals(Errors.UNSUPPORTED_VERSION.code(), response.data.errorCode())
+
+ verify(metadataCache, never).getAllTopics
+ verify(groupConfigManager, never).groupIds
+ verify(clientMetricsManager, never).listClientMetricsResources
+ verify(metadataCache, never).getBrokerNodes(any)
+ }
+
+ @Test
+ def testListConfigResourcesWithException(): Unit = {
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build())
metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
when(clientMetricsManager.listClientMetricsResources).thenThrow(new
RuntimeException("test"))
kafkaApis = createKafkaApis()
kafkaApis.handle(request, RequestLocal.noCaching)
- val response =
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
+ val response = verifyNoThrottling[ListConfigResourcesResponse](request)
- val expectedResponse = new
ListClientMetricsResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
+ val expectedResponse = new
ListConfigResourcesResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
assertEquals(expectedResponse, response.data)
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index c55f55fad3e..db03c891e4c 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -641,8 +641,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.ASSIGN_REPLICAS_TO_DIRS =>
new AssignReplicasToDirsRequest.Builder(new
AssignReplicasToDirsRequestData())
- case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
- new ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData())
+ case ApiKeys.LIST_CONFIG_RESOURCES =>
+ new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData())
case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
new DescribeTopicPartitionsRequest.Builder(new
DescribeTopicPartitionsRequestData())
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 87a45110cd1..80ef0d24a4a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -68,6 +69,10 @@ public class GroupConfigManager implements AutoCloseable {
return Optional.ofNullable(configMap.get(groupId));
}
+ public List<String> groupIds() {
+ return List.copyOf(configMap.keySet());
+ }
+
/**
* Validate the given properties.
*
diff --git
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index 56914f2fe46..e2a76e5caba 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -130,8 +130,8 @@ import
org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter;
import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter;
import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter;
import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter;
-import
org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter;
-import
org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ListConfigResourcesRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ListConfigResourcesResponseDataJsonConverter;
import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter;
import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter;
import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter;
@@ -312,8 +312,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
-import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListConfigResourcesRequest;
+import org.apache.kafka.common.requests.ListConfigResourcesResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -507,8 +507,8 @@ public class RequestConvertToJson {
return
JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(),
request.version());
case LEAVE_GROUP:
return
LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(),
request.version());
- case LIST_CLIENT_METRICS_RESOURCES:
- return
ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest)
request).data(), request.version());
+ case LIST_CONFIG_RESOURCES:
+ return
ListConfigResourcesRequestDataJsonConverter.write(((ListConfigResourcesRequest)
request).data(), request.version());
case LIST_GROUPS:
return
ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(),
request.version());
case LIST_OFFSETS:
@@ -693,8 +693,8 @@ public class RequestConvertToJson {
return
JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(),
version);
case LEAVE_GROUP:
return
LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse)
response).data(), version);
- case LIST_CLIENT_METRICS_RESOURCES:
- return
ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse)
response).data(), version);
+ case LIST_CONFIG_RESOURCES:
+ return
ListConfigResourcesResponseDataJsonConverter.write(((ListConfigResourcesResponse)
response).data(), version);
case LIST_GROUPS:
return
ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse)
response).data(), version);
case LIST_OFFSETS: