This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 30d7c71f09d KAFKA-18904: Add Admin#listConfigResources [2/N] (#19743)
30d7c71f09d is described below

commit 30d7c71f09d1f87daad03f44313f172f8b944f5a
Author: PoAn Yang <[email protected]>
AuthorDate: Thu May 22 10:05:35 2025 -0500

    KAFKA-18904: Add Admin#listConfigResources [2/N] (#19743)
    
    * Add new functions `listConfigResources(Set<ConfigResource.Type>
    configResourceTypes, ListConfigResourcesOptions options)` and
    `listConfigResources()` to `Admin` interface.
      * New functions can list all kind of config resource types.
      * If input is a set with a type other than `CLIENT_METRICS` and
    request version is 0, return `UnsupportedVersionException`.
    * Deprecate functions
    `listClientMetricsResources(ListClientMetricsResourcesOptions options)`
    and `listClientMetricsResources()`.
    * Deprecate classes `ListClientMetricsResourcesResult` and
    `ClientMetricsResourceListing`.
    * Change `ClientMetricsCommand` to use `listConfigResources`.
    * Add integration tests to `PlaintextAdminIntegrationTest.java`.
    * Add unit tests to `KafkaAdminClientTest.java`.
    
    Reviewers: Andrew Schofield <[email protected]>
    
    ---------
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  26 +
 .../admin/ClientMetricsResourceListing.java        |   1 +
 .../kafka/clients/admin/ForwardingAdmin.java       |   6 +
 .../kafka/clients/admin/KafkaAdminClient.java      | 659 +++++++++++----------
 .../admin/ListClientMetricsResourcesOptions.java   |   2 +
 .../admin/ListClientMetricsResourcesResult.java    |   2 +
 ...ptions.java => ListConfigResourcesOptions.java} |   5 +-
 ...sResult.java => ListConfigResourcesResult.java} |  21 +-
 .../requests/ListConfigResourcesRequest.java       |  11 +
 .../requests/ListConfigResourcesResponse.java      |   9 -
 .../kafka/clients/admin/AdminClientTestUtils.java  |  16 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  67 +++
 .../kafka/clients/admin/MockAdminClient.java       |   6 +
 .../kafka/common/requests/RequestResponseTest.java |  26 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  86 +++
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../TestingMetricsInterceptingAdminClient.java     |   8 +
 .../apache/kafka/tools/ClientMetricsCommand.java   |  19 +-
 .../kafka/tools/ClientMetricsCommandTest.java      |  14 +-
 19 files changed, 631 insertions(+), 356 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 84553dbe50a..3f90405bc65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1775,12 +1775,36 @@ public interface Admin extends AutoCloseable {
     FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                         FenceProducersOptions options);
 
+    /**
+     * List the configuration resources available in the cluster which matches 
config resource type.
+     * If no config resource types are specified, all configuration resources 
will be listed.
+     *
+     * @param configResourceTypes The set of configuration resource types to 
list.
+     * @param options The options to use when listing the configuration 
resources.
+     * @return The ListConfigurationResourcesResult.
+     */
+    ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> 
configResourceTypes, ListConfigResourcesOptions options);
+
+    /**
+     * List all configuration resources available in the cluster with the 
default options.
+     * <p>
+     * This is a convenience method for {@link #listConfigResources(Set, 
ListConfigResourcesOptions)}
+     * with default options. See the overload for more details.
+     *
+     * @return The ListConfigurationResourcesResult.
+     */
+    default ListConfigResourcesResult listConfigResources() {
+        return listConfigResources(Set.of(), new ListConfigResourcesOptions());
+    }
+
     /**
      * List the client metrics configuration resources available in the 
cluster.
      *
      * @param options The options to use when listing the client metrics 
resources.
      * @return The ListClientMetricsResourcesResult.
+     * @deprecated Since 4.1. Use {@link #listConfigResources(Set, 
ListConfigResourcesOptions)} instead.
      */
+    @Deprecated(since = "4.1", forRemoval = true)
     ListClientMetricsResourcesResult 
listClientMetricsResources(ListClientMetricsResourcesOptions options);
 
     /**
@@ -1790,7 +1814,9 @@ public interface Admin extends AutoCloseable {
      * with default options. See the overload for more details.
      *
      * @return The ListClientMetricsResourcesResult.
+     * @deprecated Since 4.1. Use {@link #listConfigResources()} instead.
      */
+    @Deprecated(since = "4.1", forRemoval = true)
     default ListClientMetricsResourcesResult listClientMetricsResources() {
         return listClientMetricsResources(new 
ListClientMetricsResourcesOptions());
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
index b5c85b58732..d5af97080b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin;
 
 import java.util.Objects;
 
+@Deprecated(since = "4.1")
 public class ClientMetricsResourceListing {
     private final String name;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 5ac988e0acd..b99e4f6587b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -300,6 +300,12 @@ public class ForwardingAdmin implements Admin {
         return delegate.fenceProducers(transactionalIds, options);
     }
 
+    @Override
+    public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+        return delegate.listConfigResources(configResourceTypes, options);
+    }
+
+    @SuppressWarnings({"deprecation", "removal"})
     @Override
     public ListClientMetricsResourcesResult 
listClientMetricsResources(ListClientMetricsResourcesOptions options) {
         return delegate.listClientMetricsResources(options);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 0abad889c85..b283d65cbee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -419,11 +419,11 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Get or create a list value from a map.
      *
-     * @param map   The map to get or create the element from.
-     * @param key   The key.
-     * @param <K>   The key type.
-     * @param <V>   The value type.
-     * @return      The list value.
+     * @param map The map to get or create the element from.
+     * @param key The key.
+     * @param <K> The key type.
+     * @param <V> The value type.
+     * @return The list value.
      */
     static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
         return map.computeIfAbsent(key, k -> new LinkedList<>());
@@ -432,9 +432,9 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Send an exception to every element in a collection of KafkaFutureImpls.
      *
-     * @param futures   The collection of KafkaFutureImpl objects.
-     * @param exc       The exception
-     * @param <T>       The KafkaFutureImpl result type.
+     * @param futures The collection of KafkaFutureImpl objects.
+     * @param exc     The exception
+     * @param <T>     The KafkaFutureImpl result type.
      */
     private static <T> void 
completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) 
{
         completeAllExceptionally(futures.stream(), exc);
@@ -443,9 +443,9 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Send an exception to all futures in the provided stream
      *
-     * @param futures   The stream of KafkaFutureImpl objects.
-     * @param exc       The exception
-     * @param <T>       The KafkaFutureImpl result type.
+     * @param futures The stream of KafkaFutureImpl objects.
+     * @param exc     The exception
+     * @param <T>     The KafkaFutureImpl result type.
      */
     private static <T> void 
completeAllExceptionally(Stream<KafkaFutureImpl<T>> futures, Throwable exc) {
         futures.forEach(future -> future.completeExceptionally(exc));
@@ -454,9 +454,9 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Get the current time remaining before a deadline as an integer.
      *
-     * @param now           The current time in milliseconds.
-     * @param deadlineMs    The deadline time in milliseconds.
-     * @return              The time delta in milliseconds.
+     * @param now        The current time in milliseconds.
+     * @param deadlineMs The deadline time in milliseconds.
+     * @return The time delta in milliseconds.
      */
     static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
         long deltaMs = deadlineMs - now;
@@ -470,9 +470,8 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Generate the client id based on the configuration.
      *
-     * @param config    The configuration
-     *
-     * @return          The client id
+     * @param config The configuration
+     * @return The client id
      */
     static String generateClientId(AdminClientConfig config) {
         String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG);
@@ -488,10 +487,9 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Get the deadline for a particular call.
      *
-     * @param now               The current time in milliseconds.
-     * @param optionTimeoutMs   The timeout option given by the user.
-     *
-     * @return                  The deadline in milliseconds.
+     * @param now             The current time in milliseconds.
+     * @param optionTimeoutMs The timeout option given by the user.
+     * @return The deadline in milliseconds.
      */
     private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
         if (optionTimeoutMs != null)
@@ -502,9 +500,8 @@ public class KafkaAdminClient extends AdminClient {
     /**
      * Pretty-print an exception.
      *
-     * @param throwable     The exception.
-     *
-     * @return              A compact human-readable string.
+     * @param throwable The exception.
+     * @return A compact human-readable string.
      */
     static String prettyPrintException(Throwable throwable) {
         if (throwable == null)
@@ -550,7 +547,7 @@ public class KafkaAdminClient extends AdminClient {
                 
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                 .tags(metricTags);
             MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
-                    
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+                
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
             metrics = new Metrics(metricConfig, reporters, time, 
metricsContext);
             networkClient = ClientUtils.createNetworkClient(config,
                 clientId,
@@ -656,11 +653,11 @@ public class KafkaAdminClient extends AdminClient {
         if (defaultApiTimeoutMs < requestTimeoutMs) {
             if 
(config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG))
 {
                 throw new ConfigException("The specified value of " + 
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG +
-                        " must be no smaller than the value of " + 
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + ".");
+                    " must be no smaller than the value of " + 
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + ".");
             } else {
                 log.warn("Overriding the default value for {} ({}) with the 
explicitly configured request timeout {}",
-                        AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
this.defaultApiTimeoutMs,
-                        requestTimeoutMs);
+                    AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
this.defaultApiTimeoutMs,
+                    requestTimeoutMs);
                 return requestTimeoutMs;
             }
         }
@@ -718,6 +715,7 @@ public class KafkaAdminClient extends AdminClient {
      */
     private interface NodeProvider {
         Node provide();
+
         boolean supportsUseControllers();
     }
 
@@ -727,7 +725,7 @@ public class KafkaAdminClient extends AdminClient {
             long now = time.milliseconds();
             LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
             if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP
-                    && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+                && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
                 metadataManager.rebootstrap(now);
             }
 
@@ -757,7 +755,7 @@ public class KafkaAdminClient extends AdminClient {
         @Override
         public Node provide() {
             if (metadataManager.isReady() &&
-                    (metadataManager.nodeById(nodeId) != null)) {
+                (metadataManager.nodeById(nodeId) != null)) {
                 return metadataManager.nodeById(nodeId);
             }
             // If we can't find the node with the given constant ID, we 
schedule a
@@ -791,7 +789,7 @@ public class KafkaAdminClient extends AdminClient {
         @Override
         public Node provide() {
             if (metadataManager.isReady() &&
-                    (metadataManager.controller() != null)) {
+                (metadataManager.controller() != null)) {
                 return metadataManager.controller();
             }
             metadataManager.requestUpdate();
@@ -893,13 +891,13 @@ public class KafkaAdminClient extends AdminClient {
 
         /**
          * Handle a failure.
-         *
+         * <p>
          * Depending on what the exception is and how many times we have 
already tried, we may choose to
          * fail the Call, or retry it. It is important to print the stack 
traces here in some cases,
          * since they are not necessarily preserved in ApiVersionException 
objects.
          *
-         * @param now           The current time in milliseconds.
-         * @param throwable     The failure exception.
+         * @param now       The current time in milliseconds.
+         * @param throwable The failure exception.
          */
         final void fail(long now, Throwable throwable) {
             if (curNode != null) {
@@ -915,7 +913,7 @@ public class KafkaAdminClient extends AdminClient {
             // protocol downgrade will not count against the total number of 
retries we get for
             // this RPC. That is why 'tries' is not incremented.
             if ((throwable instanceof UnsupportedVersionException) &&
-                     
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
+                
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
                 log.debug("{} attempting protocol downgrade and then retry.", 
this);
                 runnable.pendingCalls.add(this);
                 return;
@@ -969,16 +967,14 @@ public class KafkaAdminClient extends AdminClient {
          * Create an AbstractRequest.Builder for this Call.
          *
          * @param timeoutMs The timeout in milliseconds.
-         *
-         * @return          The AbstractRequest builder.
+         * @return The AbstractRequest builder.
          */
         abstract AbstractRequest.Builder<?> createRequest(int timeoutMs);
 
         /**
          * Process the call response.
          *
-         * @param abstractResponse  The AbstractResponse.
-         *
+         * @param abstractResponse The AbstractResponse.
          */
         abstract void handleResponse(AbstractResponse abstractResponse);
 
@@ -986,16 +982,15 @@ public class KafkaAdminClient extends AdminClient {
          * Handle a failure. This will only be called if the failure exception 
was not
          * retriable, or if we hit a timeout.
          *
-         * @param throwable     The exception.
+         * @param throwable The exception.
          */
         abstract void handleFailure(Throwable throwable);
 
         /**
          * Handle an UnsupportedVersionException.
          *
-         * @param exception     The exception.
-         *
-         * @return              True if the exception can be handled; false 
otherwise.
+         * @param exception The exception.
+         * @return True if the exception can be handled; false otherwise.
          */
         boolean handleUnsupportedVersionException(UnsupportedVersionException 
exception) {
             return false;
@@ -1032,7 +1027,7 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Create a new timeout processor.
          *
-         * @param now           The current time in milliseconds since the 
epoch.
+         * @param now The current time in milliseconds since the epoch.
          */
         TimeoutProcessor(long now) {
             this.now = now;
@@ -1044,9 +1039,8 @@ public class KafkaAdminClient extends AdminClient {
          * Timed out calls will be removed and failed.
          * The remaining milliseconds until the next timeout will be updated.
          *
-         * @param calls         The collection of calls.
-         *
-         * @return              The number of calls which were timed out.
+         * @param calls The collection of calls.
+         * @return The number of calls which were timed out.
          */
         int handleTimeouts(Collection<Call> calls, String msg) {
             int numTimedOut = 0;
@@ -1068,9 +1062,8 @@ public class KafkaAdminClient extends AdminClient {
          * Check whether a call should be timed out.
          * The remaining milliseconds until the next timeout will be updated.
          *
-         * @param call      The call.
-         *
-         * @return          True if the call should be timed out.
+         * @param call The call.
+         * @return True if the call should be timed out.
          */
         boolean callHasExpired(Call call) {
             int remainingMs = calcTimeoutMsRemainingAsInt(now, 
call.deadlineMs);
@@ -1130,7 +1123,7 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Time out the elements in the pendingCalls list which are expired.
          *
-         * @param processor     The timeout processor.
+         * @param processor The timeout processor.
          */
         private void timeoutPendingCalls(TimeoutProcessor processor) {
             int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed 
out waiting for a node assignment.");
@@ -1141,7 +1134,7 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Time out calls which have been assigned to nodes.
          *
-         * @param processor     The timeout processor.
+         * @param processor The timeout processor.
          */
         private int timeoutCallsToSend(TimeoutProcessor processor) {
             int numTimedOut = 0;
@@ -1156,7 +1149,7 @@ public class KafkaAdminClient extends AdminClient {
 
         /**
          * Drain all the calls from newCalls into pendingCalls.
-         *
+         * <p>
          * This function holds the lock for the minimum amount of time, to 
avoid blocking
          * users of AdminClient who will also take the lock to add new calls.
          */
@@ -1168,7 +1161,7 @@ public class KafkaAdminClient extends AdminClient {
          * Add some calls to pendingCalls, and then clear the input list.
          * Also clears Call#curNode.
          *
-         * @param calls         The calls to add.
+         * @param calls The calls to add.
          */
         private void transitionToPendingAndClearList(List<Call> calls) {
             for (Call call : calls) {
@@ -1181,9 +1174,9 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Choose nodes for the calls in the pendingCalls list.
          *
-         * @param now           The current time in milliseconds.
-         * @return              The minimum time until a call is ready to be 
retried if any of the pending
-         *                      calls are backing off after a failure
+         * @param now The current time in milliseconds.
+         * @return The minimum time until a call is ready to be retried if any 
of the pending
+         * calls are backing off after a failure
          */
         private long maybeDrainPendingCalls(long now) {
             long pollTimeout = Long.MAX_VALUE;
@@ -1241,8 +1234,8 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Send the calls which are ready.
          *
-         * @param now                   The current time in milliseconds.
-         * @return                      The minimum timeout we need for poll().
+         * @param now The current time in milliseconds.
+         * @return The minimum timeout we need for poll().
          */
         private long sendEligibleCalls(long now) {
             long pollTimeout = Long.MAX_VALUE;
@@ -1264,7 +1257,7 @@ public class KafkaAdminClient extends AdminClient {
                     if (deadline != null) {
                         if (now >= deadline) {
                             log.info("Disconnecting from {} and revoking {} 
node assignment(s) " +
-                                "because the node is taking too long to become 
ready.",
+                                    "because the node is taking too long to 
become ready.",
                                 node.idString(), calls.size());
                             transitionToPendingAndClearList(calls);
                             client.disconnect(node.idString());
@@ -1317,12 +1310,12 @@ public class KafkaAdminClient extends AdminClient {
 
         /**
          * Time out expired calls that are in flight.
-         *
+         * <p>
          * Calls that are in flight may have been partially or completely sent 
over the wire. They may
          * even be in the process of being processed by the remote server. At 
the moment, our only option
          * to time them out is to close the entire connection.
          *
-         * @param processor         The timeout processor.
+         * @param processor The timeout processor.
          */
         private void timeoutCallsInFlight(TimeoutProcessor processor) {
             int numTimedOut = 0;
@@ -1345,8 +1338,8 @@ public class KafkaAdminClient extends AdminClient {
         /**
          * Handle responses from the server.
          *
-         * @param now                   The current time in milliseconds.
-         * @param responses             The latest responses from KafkaClient.
+         * @param now       The current time in milliseconds.
+         * @param responses The latest responses from KafkaClient.
          */
         private void handleResponses(long now, List<ClientResponse> responses) 
{
             for (ClientResponse response : responses) {
@@ -1357,7 +1350,7 @@ public class KafkaAdminClient extends AdminClient {
                     // If the server returns information about a correlation 
ID we didn't use yet,
                     // an internal server error has occurred. Close the 
connection and log an error message.
                     log.error("Internal server error on {}: server returned 
information about unknown " +
-                        "correlation ID {}, requestHeader = {}", 
response.destination(), correlationId,
+                            "correlation ID {}, requestHeader = {}", 
response.destination(), correlationId,
                         response.requestHeader());
                     client.disconnect(response.destination());
                     continue;
@@ -1476,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient {
                 numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, 
"The AdminClient thread has exited.");
                 numTimedOut += timeoutCallsToSend(timeoutProcessor);
                 numTimedOut += 
timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
-                        "The AdminClient thread has exited.");
+                    "The AdminClient thread has exited.");
                 if (numTimedOut > 0) {
                     log.info("Timed out {} remaining operation(s) during 
close.", numTimedOut);
                 }
@@ -1546,13 +1539,13 @@ public class KafkaAdminClient extends AdminClient {
 
         /**
          * Queue a call for sending.
-         *
+         * <p>
          * If the AdminClient thread has exited, this will fail. Otherwise, it 
will succeed (even
          * if the AdminClient is shutting down). This function should called 
when retrying an
          * existing call.
          *
-         * @param call      The new call object.
-         * @param now       The current time in milliseconds.
+         * @param call The new call object.
+         * @param now  The current time in milliseconds.
          */
         void enqueue(Call call, long now) {
             if (call.tries > maxRetries) {
@@ -1583,18 +1576,18 @@ public class KafkaAdminClient extends AdminClient {
 
         /**
          * Initiate a new call.
-         *
+         * <p>
          * This will fail if the AdminClient is scheduled to shut down.
          *
-         * @param call      The new call object.
-         * @param now       The current time in milliseconds.
+         * @param call The new call object.
+         * @param now  The current time in milliseconds.
          */
         void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
                 log.debug("Cannot accept new call {} when AdminClient is 
closing.", call);
                 call.handleFailure(new IllegalStateException("Cannot accept 
new calls when AdminClient is closing."));
             } else if (metadataManager.usingBootstrapControllers() &&
-                    (!call.nodeProvider.supportsUseControllers())) {
+                (!call.nodeProvider.supportsUseControllers())) {
                 call.fail(now, new UnsupportedEndpointTypeException("This 
Admin API is not " +
                     "yet supported when communicating directly with the 
controller quorum."));
             } else {
@@ -1616,7 +1609,7 @@ public class KafkaAdminClient extends AdminClient {
         private Call makeControllerMetadataCall(long now) {
             // Use DescribeCluster here, as specified by KIP-919.
             return new Call(true, "describeCluster", calcDeadlineMs(now, 
requestTimeoutMs),
-                    new MetadataUpdateNodeIdProvider()) {
+                new MetadataUpdateNodeIdProvider()) {
                 @Override
                 public DescribeClusterRequest.Builder createRequest(int 
timeoutMs) {
                     return new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
@@ -1659,7 +1652,7 @@ public class KafkaAdminClient extends AdminClient {
             // We use MetadataRequest here so that we can continue to support 
brokers that are too
             // old to handle DescribeCluster.
             return new Call(true, "fetchMetadata", calcDeadlineMs(now, 
requestTimeoutMs),
-                    new MetadataUpdateNodeIdProvider()) {
+                new MetadataUpdateNodeIdProvider()) {
                 @Override
                 public MetadataRequest.Builder createRequest(int timeoutMs) {
                     // Since this only requests node information, it's safe to 
pass true
@@ -1748,10 +1741,10 @@ public class KafkaAdminClient extends AdminClient {
      * Used when a response handler expected a result for some entity but no 
result was present.
      */
     private static <K, V> void completeUnrealizedFutures(
-            Stream<Map.Entry<K, KafkaFutureImpl<V>>> futures,
-            Function<K, String> messageFormatter) {
+        Stream<Map.Entry<K, KafkaFutureImpl<V>>> futures,
+        Function<K, String> messageFormatter) {
         futures.filter(entry -> !entry.getValue().isDone()).forEach(entry ->
-                entry.getValue().completeExceptionally(new 
ApiException(messageFormatter.apply(entry.getKey()))));
+            entry.getValue().completeExceptionally(new 
ApiException(messageFormatter.apply(entry.getKey()))));
     }
 
     /**
@@ -1759,11 +1752,11 @@ public class KafkaAdminClient extends AdminClient {
      * the initial error back to the caller if the request timed out.
      */
     private static <K, V> void maybeCompleteQuotaExceededException(
-            boolean shouldRetryOnQuotaViolation,
-            Throwable throwable,
-            Map<K, KafkaFutureImpl<V>> futures,
-            Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
-            int throttleTimeDelta) {
+        boolean shouldRetryOnQuotaViolation,
+        Throwable throwable,
+        Map<K, KafkaFutureImpl<V>> futures,
+        Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
+        int throttleTimeDelta) {
         if (shouldRetryOnQuotaViolation && throwable instanceof 
TimeoutException) {
             quotaExceededExceptions.forEach((key, value) -> 
futures.get(key).completeExceptionally(
                 new ThrottlingQuotaExceededException(
@@ -2042,10 +2035,10 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
                 return new DeleteTopicsRequest.Builder(
-                        new DeleteTopicsRequestData()
-                                .setTopics(topicIds.stream().map(
-                                    topic -> new 
DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
-                                .setTimeoutMs(timeoutMs));
+                    new DeleteTopicsRequestData()
+                        .setTopics(topicIds.stream().map(
+                            topic -> new 
DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
+                        .setTimeoutMs(timeoutMs));
             }
 
             @Override
@@ -2065,7 +2058,7 @@ public class KafkaAdminClient extends AdminClient {
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
                                 ThrottlingQuotaExceededException 
quotaExceededException = new ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), 
error.messageWithFallback());
+                                    response.throttleTimeMs(), 
error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     retryTopics.add(result.topicId());
                                     
retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException);
@@ -2088,7 +2081,7 @@ public class KafkaAdminClient extends AdminClient {
                 } else {
                     final long now = time.milliseconds();
                     final Call call = getDeleteTopicsWithIdsCall(options, 
futures, retryTopics,
-                            retryTopicQuotaExceededExceptions, now, deadline);
+                        retryTopicQuotaExceededExceptions, now, deadline);
                     runnable.call(call, now);
                 }
             }
@@ -2098,7 +2091,7 @@ public class KafkaAdminClient extends AdminClient {
                 // If there were any topics retries due to a quota exceeded 
exception, we propagate
                 // the initial error back to the caller if the request timed 
out.
                 
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
-                        throwable, futures, quotaExceededExceptions, (int) 
(time.milliseconds() - now));
+                    throwable, futures, quotaExceededExceptions, (int) 
(time.milliseconds() - now));
                 // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
@@ -2285,7 +2278,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 if (partiallyFinishedTopicDescription != null &&
-                        (responseCursor == null || 
!responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
+                    (responseCursor == null || 
!responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
                     // We can't simply check nextTopicDescription != null here 
to close the partiallyFinishedTopicDescription.
                     // Because the responseCursor topic may not show in the 
response.
                     String topicName = 
partiallyFinishedTopicDescription.name();
@@ -2368,7 +2361,7 @@ public class KafkaAdminClient extends AdminClient {
             if (topicIdIsUnrepresentable(topicId)) {
                 KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
                 future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
-                        topicId + "' cannot be represented in a request."));
+                    topicId + "' cannot be represented in a request."));
                 topicFutures.put(topicId, future);
             } else if (!topicFutures.containsKey(topicId)) {
                 topicFutures.put(topicId, new KafkaFutureImpl<>());
@@ -2377,14 +2370,14 @@ public class KafkaAdminClient extends AdminClient {
         }
         final long now = time.milliseconds();
         Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+            new LeastLoadedNodeProvider()) {
 
             @Override
             MetadataRequest.Builder createRequest(int timeoutMs) {
                 return new MetadataRequest.Builder(new MetadataRequestData()
-                        
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
-                        .setAllowAutoTopicCreation(false)
-                        
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+                    
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                    .setAllowAutoTopicCreation(false)
+                    
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
             }
 
             @Override
@@ -2446,8 +2439,8 @@ public class KafkaAdminClient extends AdminClient {
         List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
         for (PartitionInfo partitionInfo : partitionInfos) {
             TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
-                    partitionInfo.partition(), leader(partitionInfo), 
Arrays.asList(partitionInfo.replicas()),
-                    Arrays.asList(partitionInfo.inSyncReplicas()));
+                partitionInfo.partition(), leader(partitionInfo), 
Arrays.asList(partitionInfo.replicas()),
+                Arrays.asList(partitionInfo.inSyncReplicas()));
             partitions.add(topicPartitionInfo);
         }
         
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
@@ -2482,7 +2475,7 @@ public class KafkaAdminClient extends AdminClient {
                     return new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()
                         
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
                         
.setEndpointType(metadataManager.usingBootstrapControllers() ?
-                                EndpointType.CONTROLLER.id() : 
EndpointType.BROKER.id())
+                            EndpointType.CONTROLLER.id() : 
EndpointType.BROKER.id())
                         
.setIncludeFencedBrokers(options.includeFencedBrokers()));
                 } else {
                     // Since this only requests node information, it's safe to 
pass true for allowAutoTopicCreation (and it
@@ -2566,7 +2559,7 @@ public class KafkaAdminClient extends AdminClient {
         if (filter.isUnknown()) {
             KafkaFutureImpl<Collection<AclBinding>> future = new 
KafkaFutureImpl<>();
             future.completeExceptionally(new InvalidRequestException("The 
AclBindingFilter " +
-                    "must not contain UNKNOWN elements."));
+                "must not contain UNKNOWN elements."));
             return new DescribeAclsResult(future);
         }
         final long now = time.milliseconds();
@@ -2766,15 +2759,15 @@ public class KafkaAdminClient extends AdminClient {
                         if (future == null) {
                             if (node != null) {
                                 log.warn("The config {} in the response from 
node {} is not in the request",
-                                        configResource, node);
+                                    configResource, node);
                             } else {
                                 log.warn("The config {} in the response from 
the least loaded broker is not in the request",
-                                        configResource);
+                                    configResource);
                             }
                         } else {
                             if (describeConfigsResult.errorCode() != 
Errors.NONE.code()) {
                                 
future.completeExceptionally(Errors.forCode(describeConfigsResult.errorCode())
-                                        
.exception(describeConfigsResult.errorMessage()));
+                                    
.exception(describeConfigsResult.errorMessage()));
                             } else {
                                 
future.complete(describeConfigResult(describeConfigsResult));
                             }
@@ -2810,15 +2803,15 @@ public class KafkaAdminClient extends AdminClient {
 
     private Config 
describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult 
describeConfigsResult) {
         return new Config(describeConfigsResult.configs().stream().map(config 
-> new ConfigEntry(
-                config.name(),
-                config.value(),
-                
DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(),
-                config.isSensitive(),
-                config.readOnly(),
-                (config.synonyms().stream().map(synonym -> new 
ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(),
-                        
DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()),
-                
DescribeConfigsResponse.ConfigType.forId(config.configType()).type(),
-                config.documentation()
+            config.name(),
+            config.value(),
+            
DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(),
+            config.isSensitive(),
+            config.readOnly(),
+            (config.synonyms().stream().map(synonym -> new 
ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(),
+                
DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()),
+            
DescribeConfigsResponse.ConfigType.forId(config.configType()).type(),
+            config.documentation()
         )).collect(Collectors.toList()));
     }
 
@@ -2930,7 +2923,7 @@ public class KafkaAdminClient extends AdminClient {
             futures.put(replica, new KafkaFutureImpl<>());
 
         Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker 
= new HashMap<>();
-        for (Map.Entry<TopicPartitionReplica, String> entry: 
replicaAssignment.entrySet()) {
+        for (Map.Entry<TopicPartitionReplica, String> entry : 
replicaAssignment.entrySet()) {
             TopicPartitionReplica replica = entry.getKey();
             String logDir = entry.getValue();
             int brokerId = replica.brokerId();
@@ -2951,7 +2944,7 @@ public class KafkaAdminClient extends AdminClient {
         }
 
         final long now = time.milliseconds();
-        for (Map.Entry<Integer, AlterReplicaLogDirsRequestData> entry: 
replicaAssignmentByBroker.entrySet()) {
+        for (Map.Entry<Integer, AlterReplicaLogDirsRequestData> entry : 
replicaAssignmentByBroker.entrySet()) {
             final int brokerId = entry.getKey();
             final AlterReplicaLogDirsRequestData assignment = entry.getValue();
 
@@ -2966,15 +2959,15 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     AlterReplicaLogDirsResponse response = 
(AlterReplicaLogDirsResponse) abstractResponse;
-                    for (AlterReplicaLogDirTopicResult topicResult: 
response.data().results()) {
-                        for (AlterReplicaLogDirPartitionResult 
partitionResult: topicResult.partitions()) {
+                    for (AlterReplicaLogDirTopicResult topicResult : 
response.data().results()) {
+                        for (AlterReplicaLogDirPartitionResult partitionResult 
: topicResult.partitions()) {
                             TopicPartitionReplica replica = new 
TopicPartitionReplica(
-                                    topicResult.topicName(), 
partitionResult.partitionIndex(), brokerId);
+                                topicResult.topicName(), 
partitionResult.partitionIndex(), brokerId);
                             KafkaFutureImpl<Void> future = 
futures.get(replica);
                             if (future == null) {
                                 log.warn("The partition {} in the response 
from broker {} is not in the request",
-                                        new 
TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()),
-                                        brokerId);
+                                    new 
TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()),
+                                    brokerId);
                             } else if (partitionResult.errorCode() == 
Errors.NONE.code()) {
                                 future.complete(null);
                             } else {
@@ -2986,8 +2979,9 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(
                         futures.entrySet().stream().filter(entry -> 
entry.getKey().brokerId() == brokerId),
                         replica -> "The response from broker " + brokerId +
-                                " did not contain a result for replica " + 
replica);
+                            " did not contain a result for replica " + 
replica);
                 }
+
                 @Override
                 void handleFailure(Throwable throwable) {
                     // Only completes the futures of brokerId
@@ -3030,11 +3024,12 @@ public class KafkaAdminClient extends AdminClient {
                     } else {
                         // Up to v3 DescribeLogDirsResponse did not have an 
error code field, hence it defaults to None
                         Errors error = response.data().errorCode() == 
Errors.NONE.code()
-                                ? Errors.CLUSTER_AUTHORIZATION_FAILED
-                                : Errors.forCode(response.data().errorCode());
+                            ? Errors.CLUSTER_AUTHORIZATION_FAILED
+                            : Errors.forCode(response.data().errorCode());
                         future.completeExceptionally(error.exception());
                     }
                 }
+
                 @Override
                 void handleFailure(Throwable throwable) {
                     future.completeExceptionally(throwable);
@@ -3052,15 +3047,15 @@ public class KafkaAdminClient extends AdminClient {
             for (DescribeLogDirsResponseData.DescribeLogDirsTopic t : 
logDirResult.topics()) {
                 for (DescribeLogDirsResponseData.DescribeLogDirsPartition p : 
t.partitions()) {
                     replicaInfoMap.put(
-                            new TopicPartition(t.name(), p.partitionIndex()),
-                            new ReplicaInfo(p.partitionSize(), p.offsetLag(), 
p.isFutureKey()));
+                        new TopicPartition(t.name(), p.partitionIndex()),
+                        new ReplicaInfo(p.partitionSize(), p.offsetLag(), 
p.isFutureKey()));
                 }
             }
             result.put(logDirResult.logDir(), new LogDirDescription(
-                    Errors.forCode(logDirResult.errorCode()).exception(),
-                    replicaInfoMap,
-                    logDirResult.totalBytes(),
-                    logDirResult.usableBytes()));
+                Errors.forCode(logDirResult.errorCode()).exception(),
+                replicaInfoMap,
+                logDirResult.totalBytes(),
+                logDirResult.usableBytes()));
         }
         return result;
     }
@@ -3075,7 +3070,7 @@ public class KafkaAdminClient extends AdminClient {
 
         Map<Integer, DescribeLogDirsRequestData> partitionsByBroker = new 
HashMap<>();
 
-        for (TopicPartitionReplica replica: replicas) {
+        for (TopicPartitionReplica replica : replicas) {
             DescribeLogDirsRequestData requestData = 
partitionsByBroker.computeIfAbsent(replica.brokerId(),
                 brokerId -> new DescribeLogDirsRequestData());
             DescribableLogDirTopic describableLogDirTopic = 
requestData.topics().find(replica.topic());
@@ -3083,7 +3078,7 @@ public class KafkaAdminClient extends AdminClient {
                 List<Integer> partitions = new ArrayList<>();
                 partitions.add(replica.partition());
                 describableLogDirTopic = new 
DescribableLogDirTopic().setTopic(replica.topic())
-                        .setPartitions(partitions);
+                    .setPartitions(partitions);
                 requestData.topics().add(describableLogDirTopic);
             } else {
                 describableLogDirTopic.partitions().add(replica.partition());
@@ -3091,11 +3086,11 @@ public class KafkaAdminClient extends AdminClient {
         }
 
         final long now = time.milliseconds();
-        for (Map.Entry<Integer, DescribeLogDirsRequestData> entry: 
partitionsByBroker.entrySet()) {
+        for (Map.Entry<Integer, DescribeLogDirsRequestData> entry : 
partitionsByBroker.entrySet()) {
             final int brokerId = entry.getKey();
             final DescribeLogDirsRequestData topicPartitions = 
entry.getValue();
             final Map<TopicPartition, ReplicaLogDirInfo> 
replicaDirInfoByPartition = new HashMap<>();
-            for (DescribableLogDirTopic topicPartition: 
topicPartitions.topics()) {
+            for (DescribableLogDirTopic topicPartition : 
topicPartitions.topics()) {
                 for (Integer partitionId : topicPartition.partitions()) {
                     replicaDirInfoByPartition.put(new 
TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo());
                 }
@@ -3113,7 +3108,7 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = 
(DescribeLogDirsResponse) abstractResponse;
-                    for (Map.Entry<String, LogDirDescription> responseEntry: 
logDirDescriptions(response).entrySet()) {
+                    for (Map.Entry<String, LogDirDescription> responseEntry : 
logDirDescriptions(response).entrySet()) {
                         String logDir = responseEntry.getKey();
                         LogDirDescription logDirInfo = 
responseEntry.getValue();
 
@@ -3124,7 +3119,7 @@ public class KafkaAdminClient extends AdminClient {
                             handleFailure(new IllegalStateException(
                                 "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
 
-                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) {
+                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
                             TopicPartition tp = replicaInfoEntry.getKey();
                             ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
                             ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
@@ -3132,24 +3127,25 @@ public class KafkaAdminClient extends AdminClient {
                                 log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
                             } else if (replicaInfo.isFuture()) {
                                 replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-                                                                               
         replicaLogDirInfo.getCurrentReplicaOffsetLag(),
-                                                                               
         logDir,
-                                                                               
         replicaInfo.offsetLag()));
+                                    
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                    logDir,
+                                    replicaInfo.offsetLag()));
                             } else {
                                 replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
-                                                                               
         replicaInfo.offsetLag(),
-                                                                               
         replicaLogDirInfo.getFutureReplicaLogDir(),
-                                                                               
         replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                                    replicaInfo.offsetLag(),
+                                    replicaLogDirInfo.getFutureReplicaLogDir(),
+                                    
replicaLogDirInfo.getFutureReplicaOffsetLag()));
                             }
                         }
                     }
 
-                    for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry: 
replicaDirInfoByPartition.entrySet()) {
+                    for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry : 
replicaDirInfoByPartition.entrySet()) {
                         TopicPartition tp = entry.getKey();
                         KafkaFutureImpl<ReplicaLogDirInfo> future = 
futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
                         future.complete(entry.getValue());
                     }
                 }
+
                 @Override
                 void handleFailure(Throwable throwable) {
                     completeAllExceptionally(futures.values(), throwable);
@@ -3285,8 +3281,8 @@ public class KafkaAdminClient extends AdminClient {
         List<CreatableRenewers> renewers = new ArrayList<>();
         for (KafkaPrincipal principal : options.renewers()) {
             renewers.add(new CreatableRenewers()
-                    .setPrincipalName(principal.getName())
-                    .setPrincipalType(principal.getPrincipalType()));
+                .setPrincipalName(principal.getName())
+                .setPrincipalType(principal.getPrincipalType()));
         }
         runnable.call(new Call("createDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
@@ -3310,7 +3306,7 @@ public class KafkaAdminClient extends AdminClient {
                     
delegationTokenFuture.completeExceptionally(response.error().exception());
                 } else {
                     CreateDelegationTokenResponseData data = response.data();
-                    TokenInformation tokenInfo =  new 
TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), 
data.principalName()),
+                    TokenInformation tokenInfo = new 
TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(), 
data.principalName()),
                         new KafkaPrincipal(data.tokenRequesterPrincipalType(), 
data.tokenRequesterPrincipalName()),
                         options.renewers(), data.issueTimestampMs(), 
data.maxTimestampMs(), data.expiryTimestampMs());
                     DelegationToken token = new DelegationToken(tokenInfo, 
data.hmac());
@@ -3329,7 +3325,7 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac, 
final RenewDelegationTokenOptions options) {
-        final KafkaFutureImpl<Long>  expiryTimeFuture = new 
KafkaFutureImpl<>();
+        final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
@@ -3337,7 +3333,7 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) {
                 return new RenewDelegationTokenRequest.Builder(
-                        new RenewDelegationTokenRequestData()
+                    new RenewDelegationTokenRequestData()
                         .setHmac(hmac)
                         .setRenewPeriodMs(options.renewTimePeriodMs()));
             }
@@ -3363,7 +3359,7 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public ExpireDelegationTokenResult expireDelegationToken(final byte[] 
hmac, final ExpireDelegationTokenOptions options) {
-        final KafkaFutureImpl<Long>  expiryTimeFuture = new 
KafkaFutureImpl<>();
+        final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
@@ -3371,9 +3367,9 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) {
                 return new ExpireDelegationTokenRequest.Builder(
-                        new ExpireDelegationTokenRequestData()
-                            .setHmac(hmac)
-                            
.setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
+                    new ExpireDelegationTokenRequestData()
+                        .setHmac(hmac)
+                        .setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
             }
 
             @Override
@@ -3397,7 +3393,7 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public DescribeDelegationTokenResult describeDelegationToken(final 
DescribeDelegationTokenOptions options) {
-        final KafkaFutureImpl<List<DelegationToken>>  tokensFuture = new 
KafkaFutureImpl<>();
+        final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new 
KafkaFutureImpl<>();
         final long now = time.milliseconds();
         runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, 
options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
@@ -3579,11 +3575,11 @@ public class KafkaAdminClient extends AdminClient {
     public DescribeConsumerGroupsResult describeConsumerGroups(final 
Collection<String> groupIds,
                                                                final 
DescribeConsumerGroupsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, ConsumerGroupDescription> future =
-                DescribeConsumerGroupsHandler.newFuture(groupIds);
+            DescribeConsumerGroupsHandler.newFuture(groupIds);
         DescribeConsumerGroupsHandler handler = new 
DescribeConsumerGroupsHandler(options.includeAuthorizedOperations(), 
logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new 
DescribeConsumerGroupsResult(future.all().entrySet().stream()
-                .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
+            .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
 
     @Deprecated
@@ -3660,13 +3656,13 @@ public class KafkaAdminClient extends AdminClient {
                         @Override
                         ListGroupsRequest.Builder createRequest(int timeoutMs) 
{
                             List<String> states = options.groupStates()
-                                    .stream()
-                                    .map(GroupState::toString)
-                                    .collect(Collectors.toList());
+                                .stream()
+                                .map(GroupState::toString)
+                                .collect(Collectors.toList());
                             List<String> groupTypes = options.types()
-                                    .stream()
-                                    .map(GroupType::toString)
-                                    .collect(Collectors.toList());
+                                .stream()
+                                .map(GroupType::toString)
+                                .collect(Collectors.toList());
                             return new ListGroupsRequest.Builder(new 
ListGroupsRequestData()
                                 .setStatesFilter(states)
                                 .setTypesFilter(groupTypes)
@@ -3678,17 +3674,17 @@ public class KafkaAdminClient extends AdminClient {
                             if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
                                 final String groupId = group.groupId();
                                 final Optional<GroupState> groupState = 
group.groupState().isEmpty()
-                                        ? Optional.empty()
-                                        : 
Optional.of(GroupState.parse(group.groupState()));
+                                    ? Optional.empty()
+                                    : 
Optional.of(GroupState.parse(group.groupState()));
                                 final Optional<GroupType> type = 
group.groupType().isEmpty()
-                                        ? Optional.empty()
-                                        : 
Optional.of(GroupType.parse(group.groupType()));
+                                    ? Optional.empty()
+                                    : 
Optional.of(GroupType.parse(group.groupType()));
                                 final ConsumerGroupListing groupListing = new 
ConsumerGroupListing(
-                                        groupId,
-                                        groupState,
-                                        type,
-                                        protocolType.isEmpty()
-                                    );
+                                    groupId,
+                                    groupState,
+                                    type,
+                                    protocolType.isEmpty()
+                                );
                                 results.addListing(groupListing);
                             }
                         }
@@ -3736,7 +3732,7 @@ public class KafkaAdminClient extends AdminClient {
     public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs,
                                                                    
ListConsumerGroupOffsetsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> future =
-                ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+            ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
         ListConsumerGroupOffsetsHandler handler =
             new ListConsumerGroupOffsetsHandler(groupSpecs, 
options.requireStable(), logContext);
         invokeDriver(handler, future, options.timeoutMs);
@@ -3745,7 +3741,7 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, 
ListStreamsGroupOffsetsSpec> groupSpecs,
-                                                                   
ListStreamsGroupOffsetsOptions options) {
+                                                                 
ListStreamsGroupOffsetsOptions options) {
         Map<String, ListConsumerGroupOffsetsSpec> consumerGroupSpecs = 
groupSpecs.entrySet().stream()
             .collect(Collectors.toMap(
                 Map.Entry::getKey,
@@ -3760,11 +3756,11 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> 
groupIds, DeleteConsumerGroupsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, Void> future =
-                DeleteConsumerGroupsHandler.newFuture(groupIds);
+            DeleteConsumerGroupsHandler.newFuture(groupIds);
         DeleteConsumerGroupsHandler handler = new 
DeleteConsumerGroupsHandler(logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new DeleteConsumerGroupsResult(future.all().entrySet().stream()
-                .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
+            .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
 
     @Override
@@ -3776,11 +3772,11 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
-            String groupId,
-            Set<TopicPartition> partitions,
-            DeleteConsumerGroupOffsetsOptions options) {
+        String groupId,
+        Set<TopicPartition> partitions,
+        DeleteConsumerGroupOffsetsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> 
future =
-                DeleteConsumerGroupOffsetsHandler.newFuture(groupId);
+            DeleteConsumerGroupOffsetsHandler.newFuture(groupId);
         DeleteConsumerGroupOffsetsHandler handler = new 
DeleteConsumerGroupOffsetsHandler(groupId, partitions, logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new 
DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), 
partitions);
@@ -3800,11 +3796,11 @@ public class KafkaAdminClient extends AdminClient {
     public DescribeShareGroupsResult describeShareGroups(final 
Collection<String> groupIds,
                                                          final 
DescribeShareGroupsOptions options) {
         SimpleAdminApiFuture<CoordinatorKey, ShareGroupDescription> future =
-                DescribeShareGroupsHandler.newFuture(groupIds);
+            DescribeShareGroupsHandler.newFuture(groupIds);
         DescribeShareGroupsHandler handler = new 
DescribeShareGroupsHandler(options.includeAuthorizedOperations(), logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new DescribeShareGroupsResult(future.all().entrySet().stream()
-                .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
+            .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
 
     @Override
@@ -3871,13 +3867,13 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public ElectLeadersResult electLeaders(
-            final ElectionType electionType,
-            final Set<TopicPartition> topicPartitions,
-            ElectLeadersOptions options) {
+        final ElectionType electionType,
+        final Set<TopicPartition> topicPartitions,
+        ElectLeadersOptions options) {
         final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> 
electionFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         runnable.call(new Call("electLeaders", calcDeadlineMs(now, 
options.timeoutMs()),
-                new ControllerNodeProvider()) {
+            new ControllerNodeProvider()) {
 
             @Override
             public ElectLeadersRequest.Builder createRequest(int timeoutMs) {
@@ -3910,8 +3906,8 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public AlterPartitionReassignmentsResult alterPartitionReassignments(
-            Map<TopicPartition, Optional<NewPartitionReassignment>> 
reassignments,
-            AlterPartitionReassignmentsOptions options) {
+        Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+        AlterPartitionReassignmentsOptions options) {
         final Map<TopicPartition, KafkaFutureImpl<Void>> futures = new 
HashMap<>();
         final Map<String, Map<Integer, Optional<NewPartitionReassignment>>> 
topicsToReassignments = new TreeMap<>();
         for (Map.Entry<TopicPartition, Optional<NewPartitionReassignment>> 
entry : reassignments.entrySet()) {
@@ -3924,13 +3920,13 @@ public class KafkaAdminClient extends AdminClient {
 
             if (topicNameIsUnrepresentable(topic)) {
                 future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
-                        topic + "' cannot be represented in a request."));
+                    topic + "' cannot be represented in a request."));
             } else if (topicPartition.partition() < 0) {
                 future.completeExceptionally(new InvalidTopicException("The 
given partition index " +
-                        topicPartition.partition() + " is not valid."));
+                    topicPartition.partition() + " is not valid."));
             } else {
                 Map<Integer, Optional<NewPartitionReassignment>> 
partitionReassignments =
-                        topicsToReassignments.get(topicPartition.topic());
+                    topicsToReassignments.get(topicPartition.topic());
                 if (partitionReassignments == null) {
                     partitionReassignments = new TreeMap<>();
                     topicsToReassignments.put(topic, partitionReassignments);
@@ -3942,32 +3938,32 @@ public class KafkaAdminClient extends AdminClient {
 
         final long now = time.milliseconds();
         Call call = new Call("alterPartitionReassignments", 
calcDeadlineMs(now, options.timeoutMs()),
-                new ControllerNodeProvider(true)) {
+            new ControllerNodeProvider(true)) {
 
             @Override
             public AlterPartitionReassignmentsRequest.Builder 
createRequest(int timeoutMs) {
                 AlterPartitionReassignmentsRequestData data =
-                        new AlterPartitionReassignmentsRequestData();
+                    new AlterPartitionReassignmentsRequestData();
                 for (Map.Entry<String, Map<Integer, 
Optional<NewPartitionReassignment>>> entry :
-                        topicsToReassignments.entrySet()) {
+                    topicsToReassignments.entrySet()) {
                     String topicName = entry.getKey();
                     Map<Integer, Optional<NewPartitionReassignment>> 
partitionsToReassignments = entry.getValue();
 
                     List<ReassignablePartition> reassignablePartitions = new 
ArrayList<>();
                     for (Map.Entry<Integer, 
Optional<NewPartitionReassignment>> partitionEntry :
-                            partitionsToReassignments.entrySet()) {
+                        partitionsToReassignments.entrySet()) {
                         int partitionIndex = partitionEntry.getKey();
                         Optional<NewPartitionReassignment> reassignment = 
partitionEntry.getValue();
 
                         ReassignablePartition reassignablePartition = new 
ReassignablePartition()
-                                .setPartitionIndex(partitionIndex)
-                                
.setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null));
+                            .setPartitionIndex(partitionIndex)
+                            
.setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null));
                         reassignablePartitions.add(reassignablePartition);
                     }
 
                     ReassignableTopic reassignableTopic = new 
ReassignableTopic()
-                            .setName(topicName)
-                            .setPartitions(reassignablePartitions);
+                        .setName(topicName)
+                        .setPartitions(reassignablePartitions);
                     data.topics().add(reassignableTopic);
                 }
                 data.setTimeoutMs(timeoutMs);
@@ -3994,8 +3990,8 @@ public class KafkaAdminClient extends AdminClient {
                             String topicName = topicResponse.name();
                             for (ReassignablePartitionResponse partition : 
topicResponse.partitions()) {
                                 errors.put(
-                                        new TopicPartition(topicName, 
partition.partitionIndex()),
-                                        new ApiError(topLevelError, 
response.data().errorMessage()).exception()
+                                    new TopicPartition(topicName, 
partition.partitionIndex()),
+                                    new ApiError(topLevelError, 
response.data().errorMessage()).exception()
                                 );
                                 receivedResponsesCount += 1;
                             }
@@ -4067,10 +4063,10 @@ public class KafkaAdminClient extends AdminClient {
                 int partition = tp.partition();
                 if (topicNameIsUnrepresentable(topic)) {
                     partitionReassignmentsFuture.completeExceptionally(new 
InvalidTopicException("The given topic name '"
-                            + topic + "' cannot be represented in a 
request."));
+                        + topic + "' cannot be represented in a request."));
                 } else if (partition < 0) {
                     partitionReassignmentsFuture.completeExceptionally(new 
InvalidTopicException("The given partition index " +
-                            partition + " is not valid."));
+                        partition + " is not valid."));
                 }
                 if (partitionReassignmentsFuture.isCompletedExceptionally())
                     return new 
ListPartitionReassignmentsResult(partitionReassignmentsFuture);
@@ -4159,7 +4155,7 @@ public class KafkaAdminClient extends AdminClient {
      */
     private Integer nodeFor(ConfigResource resource) {
         if ((resource.type() == ConfigResource.Type.BROKER && 
!resource.isDefault())
-                || resource.type() == ConfigResource.Type.BROKER_LOGGER) {
+            || resource.type() == ConfigResource.Type.BROKER_LOGGER) {
             return Integer.valueOf(resource.name());
         } else {
             return null;
@@ -4175,8 +4171,8 @@ public class KafkaAdminClient extends AdminClient {
             } else {
                 List<MemberIdentity> membersToRemove = 
res.members().stream().map(member ->
                     member.groupInstanceId().map(id -> new 
MemberIdentity().setGroupInstanceId(id))
-                    .orElseGet(() -> new 
MemberIdentity().setMemberId(member.consumerId()))
-                    .setReason(reason)
+                        .orElseGet(() -> new 
MemberIdentity().setMemberId(member.consumerId()))
+                        .setReason(reason)
                 ).collect(Collectors.toList());
 
                 future.complete(membersToRemove);
@@ -4209,7 +4205,7 @@ public class KafkaAdminClient extends AdminClient {
             DEFAULT_LEAVE_GROUP_REASON : 
JoinGroupRequest.maybeTruncateReason(options.reason());
 
         final SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity, 
Errors>> adminFuture =
-                RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
+            RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
 
         KafkaFutureImpl<List<MemberIdentity>> memFuture;
         if (options.removeAll()) {
@@ -4217,8 +4213,8 @@ public class KafkaAdminClient extends AdminClient {
         } else {
             memFuture = new KafkaFutureImpl<>();
             memFuture.complete(options.members().stream()
-                    .map(m -> m.toMemberIdentity().setReason(reason))
-                    .collect(Collectors.toList()));
+                .map(m -> m.toMemberIdentity().setReason(reason))
+                .collect(Collectors.toList()));
         }
 
         memFuture.whenComplete((members, ex) -> {
@@ -4240,7 +4236,7 @@ public class KafkaAdminClient extends AdminClient {
         AlterConsumerGroupOffsetsOptions options
     ) {
         SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>> 
future =
-                AlterConsumerGroupOffsetsHandler.newFuture(groupId);
+            AlterConsumerGroupOffsetsHandler.newFuture(groupId);
         AlterConsumerGroupOffsetsHandler handler = new 
AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new 
AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
@@ -4275,24 +4271,24 @@ public class KafkaAdminClient extends AdminClient {
 
         final long now = time.milliseconds();
         runnable.call(new Call("describeClientQuotas", calcDeadlineMs(now, 
options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+            new LeastLoadedNodeProvider()) {
 
-                @Override
-                DescribeClientQuotasRequest.Builder createRequest(int 
timeoutMs) {
-                    return new DescribeClientQuotasRequest.Builder(filter);
-                }
+            @Override
+            DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) {
+                return new DescribeClientQuotasRequest.Builder(filter);
+            }
 
-                @Override
-                void handleResponse(AbstractResponse abstractResponse) {
-                    DescribeClientQuotasResponse response = 
(DescribeClientQuotasResponse) abstractResponse;
-                    response.complete(future);
-                }
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeClientQuotasResponse response = 
(DescribeClientQuotasResponse) abstractResponse;
+                response.complete(future);
+            }
 
-                @Override
-                void handleFailure(Throwable throwable) {
-                    future.completeExceptionally(throwable);
-                }
-            }, now);
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        }, now);
 
         return new DescribeClientQuotasResult(future);
     }
@@ -4306,24 +4302,24 @@ public class KafkaAdminClient extends AdminClient {
 
         final long now = time.milliseconds();
         runnable.call(new Call("alterClientQuotas", calcDeadlineMs(now, 
options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+            new LeastLoadedNodeProvider()) {
 
-                @Override
-                AlterClientQuotasRequest.Builder createRequest(int timeoutMs) {
-                    return new AlterClientQuotasRequest.Builder(entries, 
options.validateOnly());
-                }
+            @Override
+            AlterClientQuotasRequest.Builder createRequest(int timeoutMs) {
+                return new AlterClientQuotasRequest.Builder(entries, 
options.validateOnly());
+            }
 
-                @Override
-                void handleResponse(AbstractResponse abstractResponse) {
-                    AlterClientQuotasResponse response = 
(AlterClientQuotasResponse) abstractResponse;
-                    response.complete(futures);
-                }
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                AlterClientQuotasResponse response = 
(AlterClientQuotasResponse) abstractResponse;
+                response.complete(futures);
+            }
 
-                @Override
-                void handleFailure(Throwable throwable) {
-                    completeAllExceptionally(futures.values(), throwable);
-                }
-            }, now);
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(futures.values(), throwable);
+            }
+        }, now);
 
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
@@ -4333,7 +4329,7 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<DescribeUserScramCredentialsResponseData> 
dataFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+            new LeastLoadedNodeProvider()) {
             @Override
             public DescribeUserScramCredentialsRequest.Builder 
createRequest(final int timeoutMs) {
                 final DescribeUserScramCredentialsRequestData requestData = 
new DescribeUserScramCredentialsRequestData();
@@ -4379,7 +4375,7 @@ public class KafkaAdminClient extends AdminClient {
                                                                      
AlterUserScramCredentialsOptions options) {
         final long now = time.milliseconds();
         final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
-        for (UserScramCredentialAlteration alteration: alterations) {
+        for (UserScramCredentialAlteration alteration : alterations) {
             futures.put(alteration.user(), new KafkaFutureImpl<>());
         }
         final Map<String, Exception> userIllegalAlterationExceptions = new 
HashMap<>();
@@ -4403,55 +4399,55 @@ public class KafkaAdminClient extends AdminClient {
         // so keep track of which users are affected by such a failure so we 
can fail all their alterations later
         final Map<String, Map<ScramMechanism, 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions 
= new HashMap<>();
         alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
-                .filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.user()))
-                .forEach(alteration -> {
-                    final String user = alteration.user();
-                    if (user == null || user.isEmpty()) {
-                        userIllegalAlterationExceptions.put(alteration.user(), 
new UnacceptableCredentialException(usernameMustNotBeEmptyMsg));
-                    } else {
-                        UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
-                        try {
-                            byte[] password = upsertion.password();
-                            if (password == null || password.length == 0) {
-                                userIllegalAlterationExceptions.put(user, new 
UnacceptableCredentialException(passwordMustNotBeEmptyMsg));
+            .filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.user()))
+            .forEach(alteration -> {
+                final String user = alteration.user();
+                if (user == null || user.isEmpty()) {
+                    userIllegalAlterationExceptions.put(alteration.user(), new 
UnacceptableCredentialException(usernameMustNotBeEmptyMsg));
+                } else {
+                    UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+                    try {
+                        byte[] password = upsertion.password();
+                        if (password == null || password.length == 0) {
+                            userIllegalAlterationExceptions.put(user, new 
UnacceptableCredentialException(passwordMustNotBeEmptyMsg));
+                        } else {
+                            ScramMechanism mechanism = 
upsertion.credentialInfo().mechanism();
+                            if (mechanism == null || mechanism == 
ScramMechanism.UNKNOWN) {
+                                userIllegalAlterationExceptions.put(user, new 
UnsupportedSaslMechanismException(unknownScramMechanismMsg));
                             } else {
-                                ScramMechanism mechanism = 
upsertion.credentialInfo().mechanism();
-                                if (mechanism == null || mechanism == 
ScramMechanism.UNKNOWN) {
-                                    userIllegalAlterationExceptions.put(user, 
new UnsupportedSaslMechanismException(unknownScramMechanismMsg));
-                                } else {
-                                    userInsertions.putIfAbsent(user, new 
HashMap<>());
-                                    userInsertions.get(user).put(mechanism, 
getScramCredentialUpsertion(upsertion));
-                                }
+                                userInsertions.putIfAbsent(user, new 
HashMap<>());
+                                userInsertions.get(user).put(mechanism, 
getScramCredentialUpsertion(upsertion));
                             }
-                        } catch (NoSuchAlgorithmException e) {
-                            // we might overwrite an exception from a previous 
alteration, but we don't really care
-                            // since we just need to mark this user as having 
at least one illegal alteration
-                            // and make an exception instance available for 
completing the corresponding future exceptionally
-                            userIllegalAlterationExceptions.put(user, new 
UnsupportedSaslMechanismException(unknownScramMechanismMsg));
-                        } catch (InvalidKeyException e) {
-                            // generally shouldn't happen since we deal with 
the empty password case above,
-                            // but we still need to catch/handle it
-                            userIllegalAlterationExceptions.put(user, new 
UnacceptableCredentialException(e.getMessage(), e));
                         }
+                    } catch (NoSuchAlgorithmException e) {
+                        // we might overwrite an exception from a previous 
alteration, but we don't really care
+                        // since we just need to mark this user as having at 
least one illegal alteration
+                        // and make an exception instance available for 
completing the corresponding future exceptionally
+                        userIllegalAlterationExceptions.put(user, new 
UnsupportedSaslMechanismException(unknownScramMechanismMsg));
+                    } catch (InvalidKeyException e) {
+                        // generally shouldn't happen since we deal with the 
empty password case above,
+                        // but we still need to catch/handle it
+                        userIllegalAlterationExceptions.put(user, new 
UnacceptableCredentialException(e.getMessage(), e));
                     }
-                });
+                }
+            });
 
         // submit alterations only for users that do not have an illegal 
alteration as identified above
         Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, 
options.timeoutMs()),
-                new ControllerNodeProvider()) {
+            new ControllerNodeProvider()) {
             @Override
             public AlterUserScramCredentialsRequest.Builder createRequest(int 
timeoutMs) {
                 return new AlterUserScramCredentialsRequest.Builder(
-                        new 
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
-                                .filter(a -> a instanceof 
UserScramCredentialUpsertion)
-                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.user()))
-                                .map(a -> 
userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) 
a).credentialInfo().mechanism()))
-                                .collect(Collectors.toList()))
+                    new 
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
+                            .filter(a -> a instanceof 
UserScramCredentialUpsertion)
+                            .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.user()))
+                            .map(a -> 
userInsertions.get(a.user()).get(((UserScramCredentialUpsertion) 
a).credentialInfo().mechanism()))
+                            .collect(Collectors.toList()))
                         .setDeletions(alterations.stream()
-                                .filter(a -> a instanceof 
UserScramCredentialDeletion)
-                                .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.user()))
-                                .map(d -> 
getScramCredentialDeletion((UserScramCredentialDeletion) d))
-                                .collect(Collectors.toList())));
+                            .filter(a -> a instanceof 
UserScramCredentialDeletion)
+                            .filter(a -> 
!userIllegalAlterationExceptions.containsKey(a.user()))
+                            .map(d -> 
getScramCredentialDeletion((UserScramCredentialDeletion) d))
+                            .collect(Collectors.toList())));
             }
 
             @Override
@@ -4501,10 +4497,10 @@ public class KafkaAdminClient extends AdminClient {
     private static 
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion 
getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws 
InvalidKeyException, NoSuchAlgorithmException {
         AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = 
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion();
         return retval.setName(u.user())
-                .setMechanism(u.credentialInfo().mechanism().type())
-                .setIterations(u.credentialInfo().iterations())
-                .setSalt(u.salt())
-                
.setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(), 
u.password(), u.salt(), u.credentialInfo().iterations()));
+            .setMechanism(u.credentialInfo().mechanism().type())
+            .setIterations(u.credentialInfo().iterations())
+            .setSalt(u.salt())
+            
.setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(), 
u.password(), u.salt(), u.credentialInfo().iterations()));
     }
 
     private static 
AlterUserScramCredentialsRequestData.ScramCredentialDeletion 
getScramCredentialDeletion(UserScramCredentialDeletion d) {
@@ -4513,7 +4509,7 @@ public class KafkaAdminClient extends AdminClient {
 
     private static byte[] getSaltedPassword(ScramMechanism 
publicScramMechanism, byte[] password, byte[] salt, int iterations) throws 
NoSuchAlgorithmException, InvalidKeyException {
         return new 
ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.mechanismName()))
-                .hi(password, salt, iterations);
+            .hi(password, salt, iterations);
     }
 
     @Override
@@ -4639,7 +4635,7 @@ public class KafkaAdminClient extends AdminClient {
                             }
                             // The server should send back a response for 
every feature, but we do a sanity check anyway.
                             
completeUnrealizedFutures(updateFutures.entrySet().stream(),
-                                    feature -> "The controller response did 
not contain a result for feature " + feature);
+                                feature -> "The controller response did not 
contain a result for feature " + feature);
                         }
                         break;
                     case NOT_CONTROLLER:
@@ -4670,15 +4666,15 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         final Call call = new Call(
-                "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
+            "describeMetadataQuorum", calcDeadlineMs(now, 
options.timeoutMs()), provider) {
 
             private QuorumInfo.ReplicaState 
translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) {
                 return new QuorumInfo.ReplicaState(
-                        replica.replicaId(),
-                        replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID 
: replica.replicaDirectoryId(),
-                        replica.logEndOffset(),
-                        replica.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()),
-                        replica.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
+                    replica.replicaId(),
+                    replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID : 
replica.replicaDirectoryId(),
+                    replica.logEndOffset(),
+                    replica.lastFetchTimestamp() == -1 ? OptionalLong.empty() 
: OptionalLong.of(replica.lastFetchTimestamp()),
+                    replica.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
             }
 
             private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition, 
DescribeQuorumResponseData.NodeCollection nodeCollection) {
@@ -4711,7 +4707,7 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
                 return new Builder(DescribeQuorumRequest.singletonRequest(
-                        new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, 
CLUSTER_METADATA_TOPIC_PARTITION.partition())));
+                    new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, 
CLUSTER_METADATA_TOPIC_PARTITION.partition())));
             }
 
             @Override
@@ -4723,27 +4719,27 @@ public class KafkaAdminClient extends AdminClient {
                 }
                 if (quorumResponse.data().topics().size() != 1) {
                     String msg = String.format("DescribeMetadataQuorum 
received %d topics when 1 was expected",
-                            quorumResponse.data().topics().size());
+                        quorumResponse.data().topics().size());
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
                 DescribeQuorumResponseData.TopicData topic = 
quorumResponse.data().topics().get(0);
                 if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) {
                     String msg = String.format("DescribeMetadataQuorum 
received a topic with name %s when %s was expected",
-                            topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
+                        topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
                 if (topic.partitions().size() != 1) {
                     String msg = String.format("DescribeMetadataQuorum 
received a topic %s with %d partitions when 1 was expected",
-                            topic.topicName(), topic.partitions().size());
+                        topic.topicName(), topic.partitions().size());
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
                 DescribeQuorumResponseData.PartitionData partition = 
topic.partitions().get(0);
                 if (partition.partitionIndex() != 
CLUSTER_METADATA_TOPIC_PARTITION.partition()) {
                     String msg = String.format("DescribeMetadataQuorum 
received a single partition with index %d when %d was expected",
-                            partition.partitionIndex(), 
CLUSTER_METADATA_TOPIC_PARTITION.partition());
+                        partition.partitionIndex(), 
CLUSTER_METADATA_TOPIC_PARTITION.partition());
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
@@ -4768,19 +4764,19 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         final Call call = new Call("unregisterBroker", calcDeadlineMs(now, 
options.timeoutMs()),
-                new LeastLoadedBrokerOrActiveKController()) {
+            new LeastLoadedBrokerOrActiveKController()) {
 
             @Override
             UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {
                 UnregisterBrokerRequestData data =
-                        new 
UnregisterBrokerRequestData().setBrokerId(brokerId);
+                    new UnregisterBrokerRequestData().setBrokerId(brokerId);
                 return new UnregisterBrokerRequest.Builder(data);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 final UnregisterBrokerResponse response =
-                        (UnregisterBrokerResponse) abstractResponse;
+                    (UnregisterBrokerResponse) abstractResponse;
                 Errors error = Errors.forCode(response.data().errorCode());
                 switch (error) {
                     case NONE:
@@ -4840,7 +4836,7 @@ public class KafkaAdminClient extends AdminClient {
      * where a coordinator may need to unilaterally terminate a participant 
transaction that hasn't completed.
      * </p>
      *
-     * @param transactionalId       The transactional ID whose active 
transaction should be forcefully terminated.
+     * @param transactionalId The transactional ID whose active transaction 
should be forcefully terminated.
      * @return a {@link TerminateTransactionResult} that can be used to await 
the operation result.
      */
     @Override
@@ -4879,6 +4875,45 @@ public class KafkaAdminClient extends AdminClient {
         return new FenceProducersResult(future.all());
     }
 
+    @Override
+    public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+        final long now = time.milliseconds();
+        final KafkaFutureImpl<Collection<ConfigResource>> future = new 
KafkaFutureImpl<>();
+        final Call call = new Call("listConfigResources", calcDeadlineMs(now, 
options.timeoutMs()), new LeastLoadedNodeProvider()) {
+
+            @Override
+            ListConfigResourcesRequest.Builder createRequest(int timeoutMs) {
+                return new ListConfigResourcesRequest.Builder(
+                    new ListConfigResourcesRequestData()
+                        .setResourceTypes(
+                            configResourceTypes
+                                .stream()
+                                .map(ConfigResource.Type::id)
+                                .collect(Collectors.toList())
+                        )
+                );
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                ListConfigResourcesResponse response = 
(ListConfigResourcesResponse) abstractResponse;
+                if (response.error().isFailure()) {
+                    future.completeExceptionally(response.error().exception());
+                } else {
+                    future.complete(response.configResources());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        };
+        runnable.call(call, now);
+        return new ListConfigResourcesResult(future);
+    }
+
+    @SuppressWarnings({"deprecation", "removal"})
     @Override
     public ListClientMetricsResourcesResult 
listClientMetricsResources(ListClientMetricsResourcesOptions options) {
         final long now = time.milliseconds();
@@ -4900,7 +4935,13 @@ public class KafkaAdminClient extends AdminClient {
                 if (response.error().isFailure()) {
                     future.completeExceptionally(response.error().exception());
                 } else {
-                    future.complete(response.clientMetricsResources());
+                    future.complete(response
+                        .data()
+                        .configResources()
+                        .stream()
+                        .filter(entry -> entry.resourceType() == 
ConfigResource.Type.CLIENT_METRICS.id())
+                        .map(entry -> new 
ClientMetricsResourceListing(entry.resourceName()))
+                        .collect(Collectors.toList()));
                 }
             }
 
@@ -4924,7 +4965,7 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         final Call call = new Call(
-                "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+            "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
 
             @Override
             AddRaftVoterRequest.Builder createRequest(int timeoutMs) {
@@ -4936,12 +4977,12 @@ public class KafkaAdminClient extends AdminClient {
                         setHost(endpoint.host()).
                         setPort(endpoint.port())));
                 return new AddRaftVoterRequest.Builder(
-                        new AddRaftVoterRequestData().
-                            setClusterId(options.clusterId().orElse(null)).
-                            setTimeoutMs(timeoutMs).
-                            setVoterId(voterId) .
-                            setVoterDirectoryId(voterDirectoryId).
-                            setListeners(listeners));
+                    new AddRaftVoterRequestData().
+                        setClusterId(options.clusterId().orElse(null)).
+                        setTimeoutMs(timeoutMs).
+                        setVoterId(voterId).
+                        setVoterDirectoryId(voterDirectoryId).
+                        setListeners(listeners));
             }
 
             @Override
@@ -4978,14 +5019,14 @@ public class KafkaAdminClient extends AdminClient {
         final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         final Call call = new Call(
-                "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+            "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
 
             @Override
             RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) {
                 return new RemoveRaftVoterRequest.Builder(
                     new RemoveRaftVoterRequestData().
                         setClusterId(options.clusterId().orElse(null)).
-                        setVoterId(voterId) .
+                        setVoterId(voterId).
                         setVoterDirectoryId(voterDirectoryId));
             }
 
@@ -4995,8 +5036,8 @@ public class KafkaAdminClient extends AdminClient {
                 RemoveRaftVoterResponse addResponse = 
(RemoveRaftVoterResponse) response;
                 if (addResponse.data().errorCode() != Errors.NONE.code()) {
                     ApiError error = new ApiError(
-                            addResponse.data().errorCode(),
-                            addResponse.data().errorMessage());
+                        addResponse.data().errorCode(),
+                        addResponse.data().errorMessage());
                     future.completeExceptionally(error.exception());
                 } else {
                     future.complete(null);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
index 7b6dbf302c6..f90778db12c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
 
 /**
  * Options for {@link Admin#listClientMetricsResources()}.
+ * @deprecated Since 4.1. Use {@link ListConfigResourcesOptions} instead.
  */
+@Deprecated(since = "4.1")
 public class ListClientMetricsResourcesOptions extends 
AbstractOptions<ListClientMetricsResourcesOptions> {
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
index 4a63e31c238..a4d0ed3cecb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
@@ -25,7 +25,9 @@ import java.util.Collection;
 /**
  * The result of the {@link Admin#listClientMetricsResources()} call.
  * <p>
+ * @deprecated Since 4.1. Use {@link ListConfigResourcesResult} instead.
  */
+@Deprecated(since = "4.1")
 public class ListClientMetricsResourcesResult {
     private final KafkaFuture<Collection<ClientMetricsResourceListing>> future;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java
similarity index 83%
copy from 
clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
copy to 
clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java
index 7b6dbf302c6..dbd8581c795 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.clients.admin;
 
 /**
- * Options for {@link Admin#listClientMetricsResources()}.
+ * Options for {@link Admin#listConfigResources()}.
  */
-public class ListClientMetricsResourcesOptions extends 
AbstractOptions<ListClientMetricsResourcesOptions> {
+public class ListConfigResourcesOptions extends 
AbstractOptions<ListConfigResourcesOptions> {
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java
similarity index 69%
copy from 
clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
copy to 
clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java
index 4a63e31c238..fa9ad46a72c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java
@@ -14,39 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 import java.util.Collection;
 
 /**
- * The result of the {@link Admin#listClientMetricsResources()} call.
+ * The result of the {@link Admin#listConfigResources()} call.
  * <p>
  */
-public class ListClientMetricsResourcesResult {
-    private final KafkaFuture<Collection<ClientMetricsResourceListing>> future;
+public class ListConfigResourcesResult {
+    private final KafkaFuture<Collection<ConfigResource>> future;
 
-    
ListClientMetricsResourcesResult(KafkaFuture<Collection<ClientMetricsResourceListing>>
 future) {
+    ListConfigResourcesResult(KafkaFuture<Collection<ConfigResource>> future) {
         this.future = future;
     }
 
     /**
-     * Returns a future that yields either an exception, or the full set of 
client metrics
-     * listings.
+     * Returns a future that yields either an exception, or the full set of 
config resources.
      *
      * In the event of a failure, the future yields nothing but the first 
exception which
      * occurred.
      */
-    public KafkaFuture<Collection<ClientMetricsResourceListing>> all() {
-        final KafkaFutureImpl<Collection<ClientMetricsResourceListing>> result 
= new KafkaFutureImpl<>();
-        future.whenComplete((listings, throwable) -> {
+    public KafkaFuture<Collection<ConfigResource>> all() {
+        final KafkaFutureImpl<Collection<ConfigResource>> result = new 
KafkaFutureImpl<>();
+        future.whenComplete((resources, throwable) -> {
             if (throwable != null) {
                 result.completeExceptionally(throwable);
             } else {
-                result.complete(listings);
+                result.complete(resources);
             }
         });
         return result;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
index 3af70938843..436d08c4909 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
@@ -17,12 +17,14 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ListConfigResourcesRequestData;
 import org.apache.kafka.common.message.ListConfigResourcesResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
+import java.util.HashSet;
 import java.util.Set;
 
 public class ListConfigResourcesRequest extends AbstractRequest {
@@ -36,6 +38,15 @@ public class ListConfigResourcesRequest extends 
AbstractRequest {
 
         @Override
         public ListConfigResourcesRequest build(short version) {
+            if (version == 0) {
+                // The v0 only supports CLIENT_METRICS resource type.
+                Set<Byte> resourceTypes = new HashSet<>(data.resourceTypes());
+                if (resourceTypes.size() != 1 || 
!resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id())) {
+                    throw new UnsupportedVersionException("The v0 
ListConfigResources only supports CLIENT_METRICS");
+                }
+                // The v0 request does not have resource types field, so 
creating a new request data.
+                return new ListConfigResourcesRequest(new 
ListConfigResourcesRequestData(), version);
+            }
             return new ListConfigResourcesRequest(data, version);
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
index 36a4a807f7f..f9fa50d02a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.ListConfigResourcesResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -78,12 +77,4 @@ public class ListConfigResourcesResponse extends 
AbstractResponse {
                 )
             ).collect(Collectors.toList());
     }
-
-    public Collection<ClientMetricsResourceListing> clientMetricsResources() {
-        return data.configResources()
-            .stream()
-            .filter(entry -> entry.resourceType() == 
ConfigResource.Type.CLIENT_METRICS.id())
-            .map(entry -> new 
ClientMetricsResourceListing(entry.resourceName()))
-            .collect(Collectors.toList());
-    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index c98ffb9483f..36e7571d8dd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -163,17 +163,17 @@ public class AdminClientTestUtils {
         return new 
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
 future));
     }
 
-    public static ListClientMetricsResourcesResult 
listClientMetricsResourcesResult(String... names) {
-        return new ListClientMetricsResourcesResult(
-                KafkaFuture.completedFuture(Arrays.stream(names)
-                        .map(ClientMetricsResourceListing::new)
-                        .collect(Collectors.toList())));
+    public static ListConfigResourcesResult 
listConfigResourcesResult(String... names) {
+        return new ListConfigResourcesResult(
+            KafkaFuture.completedFuture(Arrays.stream(names)
+                .map(name -> new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, name))
+                .collect(Collectors.toList())));
     }
 
-    public static ListClientMetricsResourcesResult 
listClientMetricsResourcesResult(KafkaException exception) {
-        final KafkaFutureImpl<Collection<ClientMetricsResourceListing>> future 
= new KafkaFutureImpl<>();
+    public static ListConfigResourcesResult 
listConfigResourcesResult(KafkaException exception) {
+        final KafkaFutureImpl<Collection<ConfigResource>> future = new 
KafkaFutureImpl<>();
         future.completeExceptionally(exception);
-        return new ListClientMetricsResourcesResult(future);
+        return new ListConfigResourcesResult(future);
     }
 
     public static ListShareGroupOffsetsResult 
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> groupOffsets) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 83356b68ff5..d7139be1698 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -10664,6 +10664,7 @@ public class KafkaAdminClientTest {
                                           member.memberEpoch());
     }
 
+    @SuppressWarnings({"deprecation", "removal"})
     @Test
     public void testListClientMetricsResources() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -10697,6 +10698,7 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @SuppressWarnings({"deprecation", "removal"})
     @Test
     public void testListClientMetricsResourcesEmpty() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -10714,6 +10716,7 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @SuppressWarnings({"deprecation", "removal"})
     @Test
     public void testListClientMetricsResourcesNotSupported() {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -10729,6 +10732,70 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListConfigResources() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            List<ConfigResource> expected = List.of(
+                new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
"client-metrics"),
+                new ConfigResource(ConfigResource.Type.BROKER, "1"),
+                new ConfigResource(ConfigResource.Type.BROKER_LOGGER, "1"),
+                new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
+                new ConfigResource(ConfigResource.Type.GROUP, "group")
+            );
+
+            ListConfigResourcesResponseData responseData =
+                new 
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
+
+            expected.forEach(c ->
+                responseData.configResources()
+                    .add(new ListConfigResourcesResponseData
+                        .ConfigResource()
+                        .setResourceName(c.name())
+                        .setResourceType(c.type().id())
+                    )
+            );
+
+            env.kafkaClient().prepareResponse(
+                request -> request instanceof ListConfigResourcesRequest,
+                new ListConfigResourcesResponse(responseData));
+
+            ListConfigResourcesResult result = 
env.adminClient().listConfigResources();
+            assertEquals(expected.size(), result.all().get().size());
+            assertEquals(new HashSet<>(expected), new 
HashSet<>(result.all().get()));
+        }
+    }
+
+    @Test
+    public void testListConfigResourcesEmpty() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            ListConfigResourcesResponseData responseData =
+                new 
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
+
+            env.kafkaClient().prepareResponse(
+                request -> request instanceof ListConfigResourcesRequest,
+                new ListConfigResourcesResponse(responseData));
+
+            ListConfigResourcesResult result = 
env.adminClient().listConfigResources();
+            assertTrue(result.all().get().isEmpty());
+        }
+    }
+
+    @Test
+    public void testListConfigResourcesNotSupported() {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().prepareResponse(
+                request -> request instanceof ListConfigResourcesRequest,
+                new ListConfigResourcesResponse(new 
ListConfigResourcesResponseData()
+                    .setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
+
+            ListConfigResourcesResult result = 
env.adminClient().listConfigResources(
+                Set.of(ConfigResource.Type.UNKNOWN), new 
ListConfigResourcesOptions());
+
+            assertNotNull(result.all());
+            TestUtils.assertFutureThrows(UnsupportedVersionException.class, 
result.all());
+        }
+    }
+
     @Test
     public void 
testCallFailWithUnsupportedVersionExceptionDoesNotHaveConcurrentModificationException()
 throws InterruptedException {
         Cluster cluster = mockCluster(1, 0);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 02a9e628b7e..37e18e15983 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1394,6 +1394,12 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public ListConfigResourcesResult 
listConfigResources(Set<ConfigResource.Type> configResourceTypes, 
ListConfigResourcesOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @SuppressWarnings("deprecation")
     @Override
     public ListClientMetricsResourcesResult 
listClientMetricsResources(ListClientMetricsResourcesOptions options) {
         KafkaFutureImpl<Collection<ClientMetricsResourceListing>> future = new 
KafkaFutureImpl<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index aa50f9db018..7a3be68ff78 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -3637,7 +3637,10 @@ public class RequestResponseTest {
     }
 
     private ListConfigResourcesRequest createListConfigResourcesRequest(short 
version) {
-        return new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build(version);
+        return version == 0 ?
+            new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()
+                
.setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))).build(version)
 :
+            new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build(version);
     }
 
     private ListConfigResourcesResponse createListConfigResourcesResponse() {
@@ -3951,4 +3954,25 @@ public class RequestResponseTest {
                 parseRequest(SASL_AUTHENTICATE, 
SASL_AUTHENTICATE.latestVersion(), accessor)).getMessage();
         assertEquals("Error reading byte array of 32767 byte(s): only 3 
byte(s) available", msg);
     }
+
+    @Test
+    public void 
testListConfigResourcesRequestV0FailsWithConfigResourceTypeOtherThanClientMetrics()
 {
+        // One type which is not CLIENT_METRICS
+        Arrays.stream(ConfigResource.Type.values())
+            .filter(t -> t != ConfigResource.Type.CLIENT_METRICS)
+            .forEach(t -> {
+                ListConfigResourcesRequestData data = new 
ListConfigResourcesRequestData()
+                    .setResourceTypes(List.of(t.id()));
+                assertThrows(UnsupportedVersionException.class, () -> new 
ListConfigResourcesRequest.Builder(data).build((short) 0));
+            });
+
+        // Multiple types with CLIENT_METRICS
+        Arrays.stream(ConfigResource.Type.values())
+            .filter(t -> t != ConfigResource.Type.CLIENT_METRICS)
+            .forEach(t -> {
+                ListConfigResourcesRequestData data = new 
ListConfigResourcesRequestData()
+                    .setResourceTypes(List.of(t.id(), 
ConfigResource.Type.CLIENT_METRICS.id()));
+                assertThrows(UnsupportedVersionException.class, () -> new 
ListConfigResourcesRequest.Builder(data).build((short) 0));
+            });
+    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 423e869ffd0..be0c9dea17b 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -3866,6 +3866,92 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     } finally client.close(time.Duration.ZERO)
   }
 
+  @Test
+  def testListConfigResources(): Unit = {
+    client = createAdminClient
+
+    // Alter group and client metric config to add group and client metric 
config resource
+    val clientMetric = "client-metrics"
+    val group = "group"
+    val clientMetricResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetric)
+    val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+    val alterResult = client.incrementalAlterConfigs(util.Map.of(
+      clientMetricResource,
+      util.Set.of(new AlterConfigOp(new ConfigEntry("interval.ms", "111"), 
AlterConfigOp.OpType.SET)),
+      groupResource,
+      util.Set.of(new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), 
AlterConfigOp.OpType.SET))
+    ))
+    assertEquals(util.Set.of(clientMetricResource, groupResource), 
alterResult.values.keySet)
+    alterResult.all.get(15, TimeUnit.SECONDS)
+
+    ensureConsistentKRaftMetadata()
+
+    // non-specified config resource type retrieves all config resources
+    var configResources = client.listConfigResources().all().get()
+    assertEquals(9, configResources.size())
+    brokerServers.foreach(b => {
+      assertTrue(configResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
+      assertTrue(configResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
+    })
+    assertTrue(configResources.contains(new 
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+    assertTrue(configResources.contains(groupResource))
+    assertTrue(configResources.contains(clientMetricResource))
+
+    // BROKER config resource type retrieves only broker config resources
+    configResources = 
client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER), new 
ListConfigResourcesOptions()).all().get()
+    assertEquals(3, configResources.size())
+    brokerServers.foreach(b => {
+      assertTrue(configResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
+      assertFalse(configResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
+    })
+    assertFalse(configResources.contains(new 
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+    assertFalse(configResources.contains(groupResource))
+    assertFalse(configResources.contains(clientMetricResource))
+
+    // BROKER_LOGGER config resource type retrieves only broker logger config 
resources
+    configResources = 
client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER_LOGGER), new 
ListConfigResourcesOptions()).all().get()
+    assertEquals(3, configResources.size())
+    brokerServers.foreach(b => {
+      assertFalse(configResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
+      assertTrue(configResources.contains(new 
ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
+    })
+    assertFalse(configResources.contains(new 
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+    assertFalse(configResources.contains(groupResource))
+    assertFalse(configResources.contains(clientMetricResource))
+
+    // TOPIC config resource type retrieves only topic config resources
+    configResources = 
client.listConfigResources(util.Set.of(ConfigResource.Type.TOPIC), new 
ListConfigResourcesOptions()).all().get()
+    assertEquals(1, configResources.size())
+    assertTrue(configResources.contains(new 
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+
+    // GROUP config resource type retrieves only group config resources
+    configResources = 
client.listConfigResources(util.Set.of(ConfigResource.Type.GROUP), new 
ListConfigResourcesOptions()).all().get()
+    assertEquals(1, configResources.size())
+    assertTrue(configResources.contains(groupResource))
+
+    // CLIENT_METRICS config resource type retrieves only client metric config 
resources
+    configResources = 
client.listConfigResources(util.Set.of(ConfigResource.Type.CLIENT_METRICS), new 
ListConfigResourcesOptions()).all().get()
+    assertEquals(1, configResources.size())
+    assertTrue(configResources.contains(clientMetricResource))
+
+    // UNKNOWN config resource type gets UNSUPPORTED_VERSION error
+    assertThrows(classOf[ExecutionException], () => {
+      client.listConfigResources(util.Set.of(ConfigResource.Type.UNKNOWN), new 
ListConfigResourcesOptions()).all().get()
+    })
+  }
+
+  @Test
+  @Timeout(30)
+  def testListConfigResourcesTimeoutMs(): Unit = {
+    client = createInvalidAdminClient()
+    try {
+      val timeoutOption = new ListConfigResourcesOptions().timeoutMs(0)
+      val exception = assertThrows(classOf[ExecutionException], () =>
+        client.listConfigResources(util.Set.of(), timeoutOption).all().get())
+      assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    } finally client.close(time.Duration.ZERO)
+  }
+
   /**
    * Test that createTopics returns the dynamic configurations of the topics 
that were created.
    *
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f698c8ef9ef..93d138e8946 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11207,7 +11207,8 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testListConfigResourcesV0(): Unit = {
-    val request = buildRequest(new ListConfigResourcesRequest.Builder(new 
ListConfigResourcesRequestData()).build(0))
+    val request = buildRequest(new ListConfigResourcesRequest.Builder(
+      new 
ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0))
     metadataCache = mock(classOf[KRaftMetadataCache])
 
     val resources = util.Set.of("client-metric1", "client-metric2")
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 91871133d68..ae8e9f0924c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -110,6 +110,8 @@ import org.apache.kafka.clients.admin.FenceProducersOptions;
 import org.apache.kafka.clients.admin.FenceProducersResult;
 import org.apache.kafka.clients.admin.ListClientMetricsResourcesOptions;
 import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
+import org.apache.kafka.clients.admin.ListConfigResourcesResult;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
@@ -436,6 +438,12 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
         return adminDelegate.fenceProducers(transactionalIds, options);
     }
 
+    @Override
+    public ListConfigResourcesResult listConfigResources(final 
Set<ConfigResource.Type> configResourceTypes, final ListConfigResourcesOptions 
options) {
+        return adminDelegate.listConfigResources(configResourceTypes, options);
+    }
+
+    @SuppressWarnings({"deprecation", "removal"})
     @Override
     public ListClientMetricsResourcesResult listClientMetricsResources(final 
ListClientMetricsResourcesOptions options) {
         return adminDelegate.listClientMetricsResources(options);
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
index f9b02f78397..8dcb5e5a750 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
@@ -21,9 +21,9 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.AlterConfigsOptions;
-import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.utils.Exit;
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -157,9 +158,11 @@ public class ClientMetricsCommand {
             if (entityNameOpt.isPresent()) {
                 entities = Collections.singletonList(entityNameOpt.get());
             } else {
-                Collection<ClientMetricsResourceListing> resources = 
adminClient.listClientMetricsResources()
-                        .all().get(30, TimeUnit.SECONDS);
-                entities = 
resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.toList());
+                Collection<ConfigResource> resources = adminClient
+                    
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new 
ListConfigResourcesOptions())
+                    .all()
+                    .get(30, TimeUnit.SECONDS);
+                entities = 
resources.stream().map(ConfigResource::name).toList();
             }
 
             for (String entity : entities) {
@@ -170,9 +173,11 @@ public class ClientMetricsCommand {
         }
 
         public void listClientMetrics() throws Exception {
-            Collection<ClientMetricsResourceListing> resources = 
adminClient.listClientMetricsResources()
-                    .all().get(30, TimeUnit.SECONDS);
-            String results = 
resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.joining("\n"));
+            Collection<ConfigResource> resources = adminClient
+                
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new 
ListConfigResourcesOptions())
+                .all()
+                .get(30, TimeUnit.SECONDS);
+            String results = 
resources.stream().map(ConfigResource::name).collect(Collectors.joining("\n"));
             System.out.println(results);
         }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
index c58748bf3c0..2fcf082f0a0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.AlterConfigsResult;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.DescribeConfigsResult;
-import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
+import org.apache.kafka.clients.admin.ListConfigResourcesResult;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Exit;
@@ -254,8 +254,8 @@ public class ClientMetricsCommandTest {
         Admin adminClient = mock(Admin.class);
         ClientMetricsCommand.ClientMetricsService service = new 
ClientMetricsCommand.ClientMetricsService(adminClient);
 
-        ListClientMetricsResourcesResult result = 
AdminClientTestUtils.listClientMetricsResourcesResult(clientMetricsName);
-        when(adminClient.listClientMetricsResources()).thenReturn(result);
+        ListConfigResourcesResult result = 
AdminClientTestUtils.listConfigResourcesResult(clientMetricsName);
+        when(adminClient.listConfigResources(any(), any())).thenReturn(result);
         ConfigResource cr = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName);
         Config cfg = new Config(Collections.singleton(new 
ConfigEntry("metrics", "org.apache.kafka.producer.")));
         DescribeConfigsResult describeResult = 
AdminClientTestUtils.describeConfigsResult(cr, cfg);
@@ -278,8 +278,8 @@ public class ClientMetricsCommandTest {
         Admin adminClient = mock(Admin.class);
         ClientMetricsCommand.ClientMetricsService service = new 
ClientMetricsCommand.ClientMetricsService(adminClient);
 
-        ListClientMetricsResourcesResult result = 
AdminClientTestUtils.listClientMetricsResourcesResult("one", "two");
-        when(adminClient.listClientMetricsResources()).thenReturn(result);
+        ListConfigResourcesResult result = 
AdminClientTestUtils.listConfigResourcesResult("one", "two");
+        when(adminClient.listConfigResources(any(), any())).thenReturn(result);
 
         String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
             try {
@@ -296,8 +296,8 @@ public class ClientMetricsCommandTest {
         Admin adminClient = mock(Admin.class);
         ClientMetricsCommand.ClientMetricsService service = new 
ClientMetricsCommand.ClientMetricsService(adminClient);
 
-        ListClientMetricsResourcesResult result = 
AdminClientTestUtils.listClientMetricsResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
-        when(adminClient.listClientMetricsResources()).thenReturn(result);
+        ListConfigResourcesResult result = 
AdminClientTestUtils.listConfigResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
+        when(adminClient.listConfigResources(any(), any())).thenReturn(result);
 
         assertThrows(ExecutionException.class, () -> 
service.listClientMetrics());
     }

Reply via email to