This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 30d7c71f09d KAFKA-18904: Add Admin#listConfigResources [2/N] (#19743)
30d7c71f09d is described below
commit 30d7c71f09d1f87daad03f44313f172f8b944f5a
Author: PoAn Yang <[email protected]>
AuthorDate: Thu May 22 10:05:35 2025 -0500
KAFKA-18904: Add Admin#listConfigResources [2/N] (#19743)
* Add new functions `listConfigResources(Set<ConfigResource.Type>
configResourceTypes, ListConfigResourcesOptions options)` and
`listConfigResources()` to `Admin` interface.
* New functions can list all kind of config resource types.
* If input is a set with a type other than `CLIENT_METRICS` and
request version is 0, return `UnsupportedVersionException`.
* Deprecate functions
`listClientMetricsResources(ListClientMetricsResourcesOptions options)`
and `listClientMetricsResources()`.
* Deprecate classes `ListClientMetricsResourcesResult` and
`ClientMetricsResourceListing`.
* Change `ClientMetricsCommand` to use `listConfigResources`.
* Add integration tests to `PlaintextAdminIntegrationTest.java`.
* Add unit tests to `KafkaAdminClientTest.java`.
Reviewers: Andrew Schofield <[email protected]>
---------
Signed-off-by: PoAn Yang <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 26 +
.../admin/ClientMetricsResourceListing.java | 1 +
.../kafka/clients/admin/ForwardingAdmin.java | 6 +
.../kafka/clients/admin/KafkaAdminClient.java | 659 +++++++++++----------
.../admin/ListClientMetricsResourcesOptions.java | 2 +
.../admin/ListClientMetricsResourcesResult.java | 2 +
...ptions.java => ListConfigResourcesOptions.java} | 5 +-
...sResult.java => ListConfigResourcesResult.java} | 21 +-
.../requests/ListConfigResourcesRequest.java | 11 +
.../requests/ListConfigResourcesResponse.java | 9 -
.../kafka/clients/admin/AdminClientTestUtils.java | 16 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 67 +++
.../kafka/clients/admin/MockAdminClient.java | 6 +
.../kafka/common/requests/RequestResponseTest.java | 26 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 86 +++
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../TestingMetricsInterceptingAdminClient.java | 8 +
.../apache/kafka/tools/ClientMetricsCommand.java | 19 +-
.../kafka/tools/ClientMetricsCommandTest.java | 14 +-
19 files changed, 631 insertions(+), 356 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 84553dbe50a..3f90405bc65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1775,12 +1775,36 @@ public interface Admin extends AutoCloseable {
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
FenceProducersOptions options);
+ /**
+ * List the configuration resources available in the cluster which matches
config resource type.
+ * If no config resource types are specified, all configuration resources
will be listed.
+ *
+ * @param configResourceTypes The set of configuration resource types to
list.
+ * @param options The options to use when listing the configuration
resources.
+ * @return The ListConfigurationResourcesResult.
+ */
+ ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type>
configResourceTypes, ListConfigResourcesOptions options);
+
+ /**
+ * List all configuration resources available in the cluster with the
default options.
+ * <p>
+ * This is a convenience method for {@link #listConfigResources(Set,
ListConfigResourcesOptions)}
+ * with default options. See the overload for more details.
+ *
+ * @return The ListConfigurationResourcesResult.
+ */
+ default ListConfigResourcesResult listConfigResources() {
+ return listConfigResources(Set.of(), new ListConfigResourcesOptions());
+ }
+
/**
* List the client metrics configuration resources available in the
cluster.
*
* @param options The options to use when listing the client metrics
resources.
* @return The ListClientMetricsResourcesResult.
+ * @deprecated Since 4.1. Use {@link #listConfigResources(Set,
ListConfigResourcesOptions)} instead.
*/
+ @Deprecated(since = "4.1", forRemoval = true)
ListClientMetricsResourcesResult
listClientMetricsResources(ListClientMetricsResourcesOptions options);
/**
@@ -1790,7 +1814,9 @@ public interface Admin extends AutoCloseable {
* with default options. See the overload for more details.
*
* @return The ListClientMetricsResourcesResult.
+ * @deprecated Since 4.1. Use {@link #listConfigResources()} instead.
*/
+ @Deprecated(since = "4.1", forRemoval = true)
default ListClientMetricsResourcesResult listClientMetricsResources() {
return listClientMetricsResources(new
ListClientMetricsResourcesOptions());
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
index b5c85b58732..d5af97080b0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ClientMetricsResourceListing.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin;
import java.util.Objects;
+@Deprecated(since = "4.1")
public class ClientMetricsResourceListing {
private final String name;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 5ac988e0acd..b99e4f6587b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -300,6 +300,12 @@ public class ForwardingAdmin implements Admin {
return delegate.fenceProducers(transactionalIds, options);
}
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ return delegate.listConfigResources(configResourceTypes, options);
+ }
+
+ @SuppressWarnings({"deprecation", "removal"})
@Override
public ListClientMetricsResourcesResult
listClientMetricsResources(ListClientMetricsResourcesOptions options) {
return delegate.listClientMetricsResources(options);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 0abad889c85..b283d65cbee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -419,11 +419,11 @@ public class KafkaAdminClient extends AdminClient {
/**
* Get or create a list value from a map.
*
- * @param map The map to get or create the element from.
- * @param key The key.
- * @param <K> The key type.
- * @param <V> The value type.
- * @return The list value.
+ * @param map The map to get or create the element from.
+ * @param key The key.
+ * @param <K> The key type.
+ * @param <V> The value type.
+ * @return The list value.
*/
static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
return map.computeIfAbsent(key, k -> new LinkedList<>());
@@ -432,9 +432,9 @@ public class KafkaAdminClient extends AdminClient {
/**
* Send an exception to every element in a collection of KafkaFutureImpls.
*
- * @param futures The collection of KafkaFutureImpl objects.
- * @param exc The exception
- * @param <T> The KafkaFutureImpl result type.
+ * @param futures The collection of KafkaFutureImpl objects.
+ * @param exc The exception
+ * @param <T> The KafkaFutureImpl result type.
*/
private static <T> void
completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc)
{
completeAllExceptionally(futures.stream(), exc);
@@ -443,9 +443,9 @@ public class KafkaAdminClient extends AdminClient {
/**
* Send an exception to all futures in the provided stream
*
- * @param futures The stream of KafkaFutureImpl objects.
- * @param exc The exception
- * @param <T> The KafkaFutureImpl result type.
+ * @param futures The stream of KafkaFutureImpl objects.
+ * @param exc The exception
+ * @param <T> The KafkaFutureImpl result type.
*/
private static <T> void
completeAllExceptionally(Stream<KafkaFutureImpl<T>> futures, Throwable exc) {
futures.forEach(future -> future.completeExceptionally(exc));
@@ -454,9 +454,9 @@ public class KafkaAdminClient extends AdminClient {
/**
* Get the current time remaining before a deadline as an integer.
*
- * @param now The current time in milliseconds.
- * @param deadlineMs The deadline time in milliseconds.
- * @return The time delta in milliseconds.
+ * @param now The current time in milliseconds.
+ * @param deadlineMs The deadline time in milliseconds.
+ * @return The time delta in milliseconds.
*/
static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
long deltaMs = deadlineMs - now;
@@ -470,9 +470,8 @@ public class KafkaAdminClient extends AdminClient {
/**
* Generate the client id based on the configuration.
*
- * @param config The configuration
- *
- * @return The client id
+ * @param config The configuration
+ * @return The client id
*/
static String generateClientId(AdminClientConfig config) {
String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG);
@@ -488,10 +487,9 @@ public class KafkaAdminClient extends AdminClient {
/**
* Get the deadline for a particular call.
*
- * @param now The current time in milliseconds.
- * @param optionTimeoutMs The timeout option given by the user.
- *
- * @return The deadline in milliseconds.
+ * @param now The current time in milliseconds.
+ * @param optionTimeoutMs The timeout option given by the user.
+ * @return The deadline in milliseconds.
*/
private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
if (optionTimeoutMs != null)
@@ -502,9 +500,8 @@ public class KafkaAdminClient extends AdminClient {
/**
* Pretty-print an exception.
*
- * @param throwable The exception.
- *
- * @return A compact human-readable string.
+ * @param throwable The exception.
+ * @return A compact human-readable string.
*/
static String prettyPrintException(Throwable throwable) {
if (throwable == null)
@@ -550,7 +547,7 @@ public class KafkaAdminClient extends AdminClient {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
-
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
metrics = new Metrics(metricConfig, reporters, time,
metricsContext);
networkClient = ClientUtils.createNetworkClient(config,
clientId,
@@ -656,11 +653,11 @@ public class KafkaAdminClient extends AdminClient {
if (defaultApiTimeoutMs < requestTimeoutMs) {
if
(config.originals().containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG))
{
throw new ConfigException("The specified value of " +
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG +
- " must be no smaller than the value of " +
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + ".");
+ " must be no smaller than the value of " +
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG + ".");
} else {
log.warn("Overriding the default value for {} ({}) with the
explicitly configured request timeout {}",
- AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
this.defaultApiTimeoutMs,
- requestTimeoutMs);
+ AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG,
this.defaultApiTimeoutMs,
+ requestTimeoutMs);
return requestTimeoutMs;
}
}
@@ -718,6 +715,7 @@ public class KafkaAdminClient extends AdminClient {
*/
private interface NodeProvider {
Node provide();
+
boolean supportsUseControllers();
}
@@ -727,7 +725,7 @@ public class KafkaAdminClient extends AdminClient {
long now = time.milliseconds();
LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP
- && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+ && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
metadataManager.rebootstrap(now);
}
@@ -757,7 +755,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public Node provide() {
if (metadataManager.isReady() &&
- (metadataManager.nodeById(nodeId) != null)) {
+ (metadataManager.nodeById(nodeId) != null)) {
return metadataManager.nodeById(nodeId);
}
// If we can't find the node with the given constant ID, we
schedule a
@@ -791,7 +789,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public Node provide() {
if (metadataManager.isReady() &&
- (metadataManager.controller() != null)) {
+ (metadataManager.controller() != null)) {
return metadataManager.controller();
}
metadataManager.requestUpdate();
@@ -893,13 +891,13 @@ public class KafkaAdminClient extends AdminClient {
/**
* Handle a failure.
- *
+ * <p>
* Depending on what the exception is and how many times we have
already tried, we may choose to
* fail the Call, or retry it. It is important to print the stack
traces here in some cases,
* since they are not necessarily preserved in ApiVersionException
objects.
*
- * @param now The current time in milliseconds.
- * @param throwable The failure exception.
+ * @param now The current time in milliseconds.
+ * @param throwable The failure exception.
*/
final void fail(long now, Throwable throwable) {
if (curNode != null) {
@@ -915,7 +913,7 @@ public class KafkaAdminClient extends AdminClient {
// protocol downgrade will not count against the total number of
retries we get for
// this RPC. That is why 'tries' is not incremented.
if ((throwable instanceof UnsupportedVersionException) &&
-
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
+
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
log.debug("{} attempting protocol downgrade and then retry.",
this);
runnable.pendingCalls.add(this);
return;
@@ -969,16 +967,14 @@ public class KafkaAdminClient extends AdminClient {
* Create an AbstractRequest.Builder for this Call.
*
* @param timeoutMs The timeout in milliseconds.
- *
- * @return The AbstractRequest builder.
+ * @return The AbstractRequest builder.
*/
abstract AbstractRequest.Builder<?> createRequest(int timeoutMs);
/**
* Process the call response.
*
- * @param abstractResponse The AbstractResponse.
- *
+ * @param abstractResponse The AbstractResponse.
*/
abstract void handleResponse(AbstractResponse abstractResponse);
@@ -986,16 +982,15 @@ public class KafkaAdminClient extends AdminClient {
* Handle a failure. This will only be called if the failure exception
was not
* retriable, or if we hit a timeout.
*
- * @param throwable The exception.
+ * @param throwable The exception.
*/
abstract void handleFailure(Throwable throwable);
/**
* Handle an UnsupportedVersionException.
*
- * @param exception The exception.
- *
- * @return True if the exception can be handled; false
otherwise.
+ * @param exception The exception.
+ * @return True if the exception can be handled; false otherwise.
*/
boolean handleUnsupportedVersionException(UnsupportedVersionException
exception) {
return false;
@@ -1032,7 +1027,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* Create a new timeout processor.
*
- * @param now The current time in milliseconds since the
epoch.
+ * @param now The current time in milliseconds since the epoch.
*/
TimeoutProcessor(long now) {
this.now = now;
@@ -1044,9 +1039,8 @@ public class KafkaAdminClient extends AdminClient {
* Timed out calls will be removed and failed.
* The remaining milliseconds until the next timeout will be updated.
*
- * @param calls The collection of calls.
- *
- * @return The number of calls which were timed out.
+ * @param calls The collection of calls.
+ * @return The number of calls which were timed out.
*/
int handleTimeouts(Collection<Call> calls, String msg) {
int numTimedOut = 0;
@@ -1068,9 +1062,8 @@ public class KafkaAdminClient extends AdminClient {
* Check whether a call should be timed out.
* The remaining milliseconds until the next timeout will be updated.
*
- * @param call The call.
- *
- * @return True if the call should be timed out.
+ * @param call The call.
+ * @return True if the call should be timed out.
*/
boolean callHasExpired(Call call) {
int remainingMs = calcTimeoutMsRemainingAsInt(now,
call.deadlineMs);
@@ -1130,7 +1123,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* Time out the elements in the pendingCalls list which are expired.
*
- * @param processor The timeout processor.
+ * @param processor The timeout processor.
*/
private void timeoutPendingCalls(TimeoutProcessor processor) {
int numTimedOut = processor.handleTimeouts(pendingCalls, "Timed
out waiting for a node assignment.");
@@ -1141,7 +1134,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* Time out calls which have been assigned to nodes.
*
- * @param processor The timeout processor.
+ * @param processor The timeout processor.
*/
private int timeoutCallsToSend(TimeoutProcessor processor) {
int numTimedOut = 0;
@@ -1156,7 +1149,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* Drain all the calls from newCalls into pendingCalls.
- *
+ * <p>
* This function holds the lock for the minimum amount of time, to
avoid blocking
* users of AdminClient who will also take the lock to add new calls.
*/
@@ -1168,7 +1161,7 @@ public class KafkaAdminClient extends AdminClient {
* Add some calls to pendingCalls, and then clear the input list.
* Also clears Call#curNode.
*
- * @param calls The calls to add.
+ * @param calls The calls to add.
*/
private void transitionToPendingAndClearList(List<Call> calls) {
for (Call call : calls) {
@@ -1181,9 +1174,9 @@ public class KafkaAdminClient extends AdminClient {
/**
* Choose nodes for the calls in the pendingCalls list.
*
- * @param now The current time in milliseconds.
- * @return The minimum time until a call is ready to be
retried if any of the pending
- * calls are backing off after a failure
+ * @param now The current time in milliseconds.
+ * @return The minimum time until a call is ready to be retried if any
of the pending
+ * calls are backing off after a failure
*/
private long maybeDrainPendingCalls(long now) {
long pollTimeout = Long.MAX_VALUE;
@@ -1241,8 +1234,8 @@ public class KafkaAdminClient extends AdminClient {
/**
* Send the calls which are ready.
*
- * @param now The current time in milliseconds.
- * @return The minimum timeout we need for poll().
+ * @param now The current time in milliseconds.
+ * @return The minimum timeout we need for poll().
*/
private long sendEligibleCalls(long now) {
long pollTimeout = Long.MAX_VALUE;
@@ -1264,7 +1257,7 @@ public class KafkaAdminClient extends AdminClient {
if (deadline != null) {
if (now >= deadline) {
log.info("Disconnecting from {} and revoking {}
node assignment(s) " +
- "because the node is taking too long to become
ready.",
+ "because the node is taking too long to
become ready.",
node.idString(), calls.size());
transitionToPendingAndClearList(calls);
client.disconnect(node.idString());
@@ -1317,12 +1310,12 @@ public class KafkaAdminClient extends AdminClient {
/**
* Time out expired calls that are in flight.
- *
+ * <p>
* Calls that are in flight may have been partially or completely sent
over the wire. They may
* even be in the process of being processed by the remote server. At
the moment, our only option
* to time them out is to close the entire connection.
*
- * @param processor The timeout processor.
+ * @param processor The timeout processor.
*/
private void timeoutCallsInFlight(TimeoutProcessor processor) {
int numTimedOut = 0;
@@ -1345,8 +1338,8 @@ public class KafkaAdminClient extends AdminClient {
/**
* Handle responses from the server.
*
- * @param now The current time in milliseconds.
- * @param responses The latest responses from KafkaClient.
+ * @param now The current time in milliseconds.
+ * @param responses The latest responses from KafkaClient.
*/
private void handleResponses(long now, List<ClientResponse> responses)
{
for (ClientResponse response : responses) {
@@ -1357,7 +1350,7 @@ public class KafkaAdminClient extends AdminClient {
// If the server returns information about a correlation
ID we didn't use yet,
// an internal server error has occurred. Close the
connection and log an error message.
log.error("Internal server error on {}: server returned
information about unknown " +
- "correlation ID {}, requestHeader = {}",
response.destination(), correlationId,
+ "correlation ID {}, requestHeader = {}",
response.destination(), correlationId,
response.requestHeader());
client.disconnect(response.destination());
continue;
@@ -1476,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient {
numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls,
"The AdminClient thread has exited.");
numTimedOut += timeoutCallsToSend(timeoutProcessor);
numTimedOut +=
timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
- "The AdminClient thread has exited.");
+ "The AdminClient thread has exited.");
if (numTimedOut > 0) {
log.info("Timed out {} remaining operation(s) during
close.", numTimedOut);
}
@@ -1546,13 +1539,13 @@ public class KafkaAdminClient extends AdminClient {
/**
* Queue a call for sending.
- *
+ * <p>
* If the AdminClient thread has exited, this will fail. Otherwise, it
will succeed (even
* if the AdminClient is shutting down). This function should called
when retrying an
* existing call.
*
- * @param call The new call object.
- * @param now The current time in milliseconds.
+ * @param call The new call object.
+ * @param now The current time in milliseconds.
*/
void enqueue(Call call, long now) {
if (call.tries > maxRetries) {
@@ -1583,18 +1576,18 @@ public class KafkaAdminClient extends AdminClient {
/**
* Initiate a new call.
- *
+ * <p>
* This will fail if the AdminClient is scheduled to shut down.
*
- * @param call The new call object.
- * @param now The current time in milliseconds.
+ * @param call The new call object.
+ * @param now The current time in milliseconds.
*/
void call(Call call, long now) {
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
log.debug("Cannot accept new call {} when AdminClient is
closing.", call);
call.handleFailure(new IllegalStateException("Cannot accept
new calls when AdminClient is closing."));
} else if (metadataManager.usingBootstrapControllers() &&
- (!call.nodeProvider.supportsUseControllers())) {
+ (!call.nodeProvider.supportsUseControllers())) {
call.fail(now, new UnsupportedEndpointTypeException("This
Admin API is not " +
"yet supported when communicating directly with the
controller quorum."));
} else {
@@ -1616,7 +1609,7 @@ public class KafkaAdminClient extends AdminClient {
private Call makeControllerMetadataCall(long now) {
// Use DescribeCluster here, as specified by KIP-919.
return new Call(true, "describeCluster", calcDeadlineMs(now,
requestTimeoutMs),
- new MetadataUpdateNodeIdProvider()) {
+ new MetadataUpdateNodeIdProvider()) {
@Override
public DescribeClusterRequest.Builder createRequest(int
timeoutMs) {
return new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()
@@ -1659,7 +1652,7 @@ public class KafkaAdminClient extends AdminClient {
// We use MetadataRequest here so that we can continue to support
brokers that are too
// old to handle DescribeCluster.
return new Call(true, "fetchMetadata", calcDeadlineMs(now,
requestTimeoutMs),
- new MetadataUpdateNodeIdProvider()) {
+ new MetadataUpdateNodeIdProvider()) {
@Override
public MetadataRequest.Builder createRequest(int timeoutMs) {
// Since this only requests node information, it's safe to
pass true
@@ -1748,10 +1741,10 @@ public class KafkaAdminClient extends AdminClient {
* Used when a response handler expected a result for some entity but no
result was present.
*/
private static <K, V> void completeUnrealizedFutures(
- Stream<Map.Entry<K, KafkaFutureImpl<V>>> futures,
- Function<K, String> messageFormatter) {
+ Stream<Map.Entry<K, KafkaFutureImpl<V>>> futures,
+ Function<K, String> messageFormatter) {
futures.filter(entry -> !entry.getValue().isDone()).forEach(entry ->
- entry.getValue().completeExceptionally(new
ApiException(messageFormatter.apply(entry.getKey()))));
+ entry.getValue().completeExceptionally(new
ApiException(messageFormatter.apply(entry.getKey()))));
}
/**
@@ -1759,11 +1752,11 @@ public class KafkaAdminClient extends AdminClient {
* the initial error back to the caller if the request timed out.
*/
private static <K, V> void maybeCompleteQuotaExceededException(
- boolean shouldRetryOnQuotaViolation,
- Throwable throwable,
- Map<K, KafkaFutureImpl<V>> futures,
- Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
- int throttleTimeDelta) {
+ boolean shouldRetryOnQuotaViolation,
+ Throwable throwable,
+ Map<K, KafkaFutureImpl<V>> futures,
+ Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
+ int throttleTimeDelta) {
if (shouldRetryOnQuotaViolation && throwable instanceof
TimeoutException) {
quotaExceededExceptions.forEach((key, value) ->
futures.get(key).completeExceptionally(
new ThrottlingQuotaExceededException(
@@ -2042,10 +2035,10 @@ public class KafkaAdminClient extends AdminClient {
@Override
DeleteTopicsRequest.Builder createRequest(int timeoutMs) {
return new DeleteTopicsRequest.Builder(
- new DeleteTopicsRequestData()
- .setTopics(topicIds.stream().map(
- topic -> new
DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
- .setTimeoutMs(timeoutMs));
+ new DeleteTopicsRequestData()
+ .setTopics(topicIds.stream().map(
+ topic -> new
DeleteTopicState().setTopicId(topic)).collect(Collectors.toList()))
+ .setTimeoutMs(timeoutMs));
}
@Override
@@ -2065,7 +2058,7 @@ public class KafkaAdminClient extends AdminClient {
if (error.isFailure()) {
if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
ThrottlingQuotaExceededException
quotaExceededException = new ThrottlingQuotaExceededException(
- response.throttleTimeMs(),
error.messageWithFallback());
+ response.throttleTimeMs(),
error.messageWithFallback());
if (options.shouldRetryOnQuotaViolation()) {
retryTopics.add(result.topicId());
retryTopicQuotaExceededExceptions.put(result.topicId(), quotaExceededException);
@@ -2088,7 +2081,7 @@ public class KafkaAdminClient extends AdminClient {
} else {
final long now = time.milliseconds();
final Call call = getDeleteTopicsWithIdsCall(options,
futures, retryTopics,
- retryTopicQuotaExceededExceptions, now, deadline);
+ retryTopicQuotaExceededExceptions, now, deadline);
runnable.call(call, now);
}
}
@@ -2098,7 +2091,7 @@ public class KafkaAdminClient extends AdminClient {
// If there were any topics retries due to a quota exceeded
exception, we propagate
// the initial error back to the caller if the request timed
out.
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
- throwable, futures, quotaExceededExceptions, (int)
(time.milliseconds() - now));
+ throwable, futures, quotaExceededExceptions, (int)
(time.milliseconds() - now));
// Fail all the other remaining futures
completeAllExceptionally(futures.values(), throwable);
}
@@ -2285,7 +2278,7 @@ public class KafkaAdminClient extends AdminClient {
}
if (partiallyFinishedTopicDescription != null &&
- (responseCursor == null ||
!responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
+ (responseCursor == null ||
!responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
// We can't simply check nextTopicDescription != null here
to close the partiallyFinishedTopicDescription.
// Because the responseCursor topic may not show in the
response.
String topicName =
partiallyFinishedTopicDescription.name();
@@ -2368,7 +2361,7 @@ public class KafkaAdminClient extends AdminClient {
if (topicIdIsUnrepresentable(topicId)) {
KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The
given topic id '" +
- topicId + "' cannot be represented in a request."));
+ topicId + "' cannot be represented in a request."));
topicFutures.put(topicId, future);
} else if (!topicFutures.containsKey(topicId)) {
topicFutures.put(topicId, new KafkaFutureImpl<>());
@@ -2377,14 +2370,14 @@ public class KafkaAdminClient extends AdminClient {
}
final long now = time.milliseconds();
Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now,
options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
+ new LeastLoadedNodeProvider()) {
@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
-
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
- .setAllowAutoTopicCreation(false)
-
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+ .setAllowAutoTopicCreation(false)
+
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
}
@Override
@@ -2446,8 +2439,8 @@ public class KafkaAdminClient extends AdminClient {
List<TopicPartitionInfo> partitions = new
ArrayList<>(partitionInfos.size());
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
- partitionInfo.partition(), leader(partitionInfo),
Arrays.asList(partitionInfo.replicas()),
- Arrays.asList(partitionInfo.inSyncReplicas()));
+ partitionInfo.partition(), leader(partitionInfo),
Arrays.asList(partitionInfo.replicas()),
+ Arrays.asList(partitionInfo.inSyncReplicas()));
partitions.add(topicPartitionInfo);
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
@@ -2482,7 +2475,7 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations())
.setEndpointType(metadataManager.usingBootstrapControllers() ?
- EndpointType.CONTROLLER.id() :
EndpointType.BROKER.id())
+ EndpointType.CONTROLLER.id() :
EndpointType.BROKER.id())
.setIncludeFencedBrokers(options.includeFencedBrokers()));
} else {
// Since this only requests node information, it's safe to
pass true for allowAutoTopicCreation (and it
@@ -2566,7 +2559,7 @@ public class KafkaAdminClient extends AdminClient {
if (filter.isUnknown()) {
KafkaFutureImpl<Collection<AclBinding>> future = new
KafkaFutureImpl<>();
future.completeExceptionally(new InvalidRequestException("The
AclBindingFilter " +
- "must not contain UNKNOWN elements."));
+ "must not contain UNKNOWN elements."));
return new DescribeAclsResult(future);
}
final long now = time.milliseconds();
@@ -2766,15 +2759,15 @@ public class KafkaAdminClient extends AdminClient {
if (future == null) {
if (node != null) {
log.warn("The config {} in the response from
node {} is not in the request",
- configResource, node);
+ configResource, node);
} else {
log.warn("The config {} in the response from
the least loaded broker is not in the request",
- configResource);
+ configResource);
}
} else {
if (describeConfigsResult.errorCode() !=
Errors.NONE.code()) {
future.completeExceptionally(Errors.forCode(describeConfigsResult.errorCode())
-
.exception(describeConfigsResult.errorMessage()));
+
.exception(describeConfigsResult.errorMessage()));
} else {
future.complete(describeConfigResult(describeConfigsResult));
}
@@ -2810,15 +2803,15 @@ public class KafkaAdminClient extends AdminClient {
private Config
describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult
describeConfigsResult) {
return new Config(describeConfigsResult.configs().stream().map(config
-> new ConfigEntry(
- config.name(),
- config.value(),
-
DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(),
- config.isSensitive(),
- config.readOnly(),
- (config.synonyms().stream().map(synonym -> new
ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(),
-
DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()),
-
DescribeConfigsResponse.ConfigType.forId(config.configType()).type(),
- config.documentation()
+ config.name(),
+ config.value(),
+
DescribeConfigsResponse.ConfigSource.forId(config.configSource()).source(),
+ config.isSensitive(),
+ config.readOnly(),
+ (config.synonyms().stream().map(synonym -> new
ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(),
+
DescribeConfigsResponse.ConfigSource.forId(synonym.source()).source()))).collect(Collectors.toList()),
+
DescribeConfigsResponse.ConfigType.forId(config.configType()).type(),
+ config.documentation()
)).collect(Collectors.toList()));
}
@@ -2930,7 +2923,7 @@ public class KafkaAdminClient extends AdminClient {
futures.put(replica, new KafkaFutureImpl<>());
Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker
= new HashMap<>();
- for (Map.Entry<TopicPartitionReplica, String> entry:
replicaAssignment.entrySet()) {
+ for (Map.Entry<TopicPartitionReplica, String> entry :
replicaAssignment.entrySet()) {
TopicPartitionReplica replica = entry.getKey();
String logDir = entry.getValue();
int brokerId = replica.brokerId();
@@ -2951,7 +2944,7 @@ public class KafkaAdminClient extends AdminClient {
}
final long now = time.milliseconds();
- for (Map.Entry<Integer, AlterReplicaLogDirsRequestData> entry:
replicaAssignmentByBroker.entrySet()) {
+ for (Map.Entry<Integer, AlterReplicaLogDirsRequestData> entry :
replicaAssignmentByBroker.entrySet()) {
final int brokerId = entry.getKey();
final AlterReplicaLogDirsRequestData assignment = entry.getValue();
@@ -2966,15 +2959,15 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
AlterReplicaLogDirsResponse response =
(AlterReplicaLogDirsResponse) abstractResponse;
- for (AlterReplicaLogDirTopicResult topicResult:
response.data().results()) {
- for (AlterReplicaLogDirPartitionResult
partitionResult: topicResult.partitions()) {
+ for (AlterReplicaLogDirTopicResult topicResult :
response.data().results()) {
+ for (AlterReplicaLogDirPartitionResult partitionResult
: topicResult.partitions()) {
TopicPartitionReplica replica = new
TopicPartitionReplica(
- topicResult.topicName(),
partitionResult.partitionIndex(), brokerId);
+ topicResult.topicName(),
partitionResult.partitionIndex(), brokerId);
KafkaFutureImpl<Void> future =
futures.get(replica);
if (future == null) {
log.warn("The partition {} in the response
from broker {} is not in the request",
- new
TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()),
- brokerId);
+ new
TopicPartition(topicResult.topicName(), partitionResult.partitionIndex()),
+ brokerId);
} else if (partitionResult.errorCode() ==
Errors.NONE.code()) {
future.complete(null);
} else {
@@ -2986,8 +2979,9 @@ public class KafkaAdminClient extends AdminClient {
completeUnrealizedFutures(
futures.entrySet().stream().filter(entry ->
entry.getKey().brokerId() == brokerId),
replica -> "The response from broker " + brokerId +
- " did not contain a result for replica " +
replica);
+ " did not contain a result for replica " +
replica);
}
+
@Override
void handleFailure(Throwable throwable) {
// Only completes the futures of brokerId
@@ -3030,11 +3024,12 @@ public class KafkaAdminClient extends AdminClient {
} else {
// Up to v3 DescribeLogDirsResponse did not have an
error code field, hence it defaults to None
Errors error = response.data().errorCode() ==
Errors.NONE.code()
- ? Errors.CLUSTER_AUTHORIZATION_FAILED
- : Errors.forCode(response.data().errorCode());
+ ? Errors.CLUSTER_AUTHORIZATION_FAILED
+ : Errors.forCode(response.data().errorCode());
future.completeExceptionally(error.exception());
}
}
+
@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
@@ -3052,15 +3047,15 @@ public class KafkaAdminClient extends AdminClient {
for (DescribeLogDirsResponseData.DescribeLogDirsTopic t :
logDirResult.topics()) {
for (DescribeLogDirsResponseData.DescribeLogDirsPartition p :
t.partitions()) {
replicaInfoMap.put(
- new TopicPartition(t.name(), p.partitionIndex()),
- new ReplicaInfo(p.partitionSize(), p.offsetLag(),
p.isFutureKey()));
+ new TopicPartition(t.name(), p.partitionIndex()),
+ new ReplicaInfo(p.partitionSize(), p.offsetLag(),
p.isFutureKey()));
}
}
result.put(logDirResult.logDir(), new LogDirDescription(
- Errors.forCode(logDirResult.errorCode()).exception(),
- replicaInfoMap,
- logDirResult.totalBytes(),
- logDirResult.usableBytes()));
+ Errors.forCode(logDirResult.errorCode()).exception(),
+ replicaInfoMap,
+ logDirResult.totalBytes(),
+ logDirResult.usableBytes()));
}
return result;
}
@@ -3075,7 +3070,7 @@ public class KafkaAdminClient extends AdminClient {
Map<Integer, DescribeLogDirsRequestData> partitionsByBroker = new
HashMap<>();
- for (TopicPartitionReplica replica: replicas) {
+ for (TopicPartitionReplica replica : replicas) {
DescribeLogDirsRequestData requestData =
partitionsByBroker.computeIfAbsent(replica.brokerId(),
brokerId -> new DescribeLogDirsRequestData());
DescribableLogDirTopic describableLogDirTopic =
requestData.topics().find(replica.topic());
@@ -3083,7 +3078,7 @@ public class KafkaAdminClient extends AdminClient {
List<Integer> partitions = new ArrayList<>();
partitions.add(replica.partition());
describableLogDirTopic = new
DescribableLogDirTopic().setTopic(replica.topic())
- .setPartitions(partitions);
+ .setPartitions(partitions);
requestData.topics().add(describableLogDirTopic);
} else {
describableLogDirTopic.partitions().add(replica.partition());
@@ -3091,11 +3086,11 @@ public class KafkaAdminClient extends AdminClient {
}
final long now = time.milliseconds();
- for (Map.Entry<Integer, DescribeLogDirsRequestData> entry:
partitionsByBroker.entrySet()) {
+ for (Map.Entry<Integer, DescribeLogDirsRequestData> entry :
partitionsByBroker.entrySet()) {
final int brokerId = entry.getKey();
final DescribeLogDirsRequestData topicPartitions =
entry.getValue();
final Map<TopicPartition, ReplicaLogDirInfo>
replicaDirInfoByPartition = new HashMap<>();
- for (DescribableLogDirTopic topicPartition:
topicPartitions.topics()) {
+ for (DescribableLogDirTopic topicPartition :
topicPartitions.topics()) {
for (Integer partitionId : topicPartition.partitions()) {
replicaDirInfoByPartition.put(new
TopicPartition(topicPartition.topic(), partitionId), new ReplicaLogDirInfo());
}
@@ -3113,7 +3108,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response =
(DescribeLogDirsResponse) abstractResponse;
- for (Map.Entry<String, LogDirDescription> responseEntry:
logDirDescriptions(response).entrySet()) {
+ for (Map.Entry<String, LogDirDescription> responseEntry :
logDirDescriptions(response).entrySet()) {
String logDir = responseEntry.getKey();
LogDirDescription logDirInfo =
responseEntry.getValue();
@@ -3124,7 +3119,7 @@ public class KafkaAdminClient extends AdminClient {
handleFailure(new IllegalStateException(
"The error " +
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in
the response from broker " + brokerId + " is illegal"));
- for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) {
+ for (Map.Entry<TopicPartition, ReplicaInfo>
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
TopicPartition tp = replicaInfoEntry.getKey();
ReplicaInfo replicaInfo =
replicaInfoEntry.getValue();
ReplicaLogDirInfo replicaLogDirInfo =
replicaDirInfoByPartition.get(tp);
@@ -3132,24 +3127,25 @@ public class KafkaAdminClient extends AdminClient {
log.warn("Server response from broker {}
mentioned unknown partition {}", brokerId, tp);
} else if (replicaInfo.isFuture()) {
replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
-
logDir,
-
replicaInfo.offsetLag()));
+
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+ logDir,
+ replicaInfo.offsetLag()));
} else {
replicaDirInfoByPartition.put(tp, new
ReplicaLogDirInfo(logDir,
-
replicaInfo.offsetLag(),
-
replicaLogDirInfo.getFutureReplicaLogDir(),
-
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+ replicaInfo.offsetLag(),
+ replicaLogDirInfo.getFutureReplicaLogDir(),
+
replicaLogDirInfo.getFutureReplicaOffsetLag()));
}
}
}
- for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry:
replicaDirInfoByPartition.entrySet()) {
+ for (Map.Entry<TopicPartition, ReplicaLogDirInfo> entry :
replicaDirInfoByPartition.entrySet()) {
TopicPartition tp = entry.getKey();
KafkaFutureImpl<ReplicaLogDirInfo> future =
futures.get(new TopicPartitionReplica(tp.topic(), tp.partition(), brokerId));
future.complete(entry.getValue());
}
}
+
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(futures.values(), throwable);
@@ -3285,8 +3281,8 @@ public class KafkaAdminClient extends AdminClient {
List<CreatableRenewers> renewers = new ArrayList<>();
for (KafkaPrincipal principal : options.renewers()) {
renewers.add(new CreatableRenewers()
- .setPrincipalName(principal.getName())
- .setPrincipalType(principal.getPrincipalType()));
+ .setPrincipalName(principal.getName())
+ .setPrincipalType(principal.getPrincipalType()));
}
runnable.call(new Call("createDelegationToken", calcDeadlineMs(now,
options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@@ -3310,7 +3306,7 @@ public class KafkaAdminClient extends AdminClient {
delegationTokenFuture.completeExceptionally(response.error().exception());
} else {
CreateDelegationTokenResponseData data = response.data();
- TokenInformation tokenInfo = new
TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(),
data.principalName()),
+ TokenInformation tokenInfo = new
TokenInformation(data.tokenId(), new KafkaPrincipal(data.principalType(),
data.principalName()),
new KafkaPrincipal(data.tokenRequesterPrincipalType(),
data.tokenRequesterPrincipalName()),
options.renewers(), data.issueTimestampMs(),
data.maxTimestampMs(), data.expiryTimestampMs());
DelegationToken token = new DelegationToken(tokenInfo,
data.hmac());
@@ -3329,7 +3325,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public RenewDelegationTokenResult renewDelegationToken(final byte[] hmac,
final RenewDelegationTokenOptions options) {
- final KafkaFutureImpl<Long> expiryTimeFuture = new
KafkaFutureImpl<>();
+ final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("renewDelegationToken", calcDeadlineMs(now,
options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@@ -3337,7 +3333,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
RenewDelegationTokenRequest.Builder createRequest(int timeoutMs) {
return new RenewDelegationTokenRequest.Builder(
- new RenewDelegationTokenRequestData()
+ new RenewDelegationTokenRequestData()
.setHmac(hmac)
.setRenewPeriodMs(options.renewTimePeriodMs()));
}
@@ -3363,7 +3359,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ExpireDelegationTokenResult expireDelegationToken(final byte[]
hmac, final ExpireDelegationTokenOptions options) {
- final KafkaFutureImpl<Long> expiryTimeFuture = new
KafkaFutureImpl<>();
+ final KafkaFutureImpl<Long> expiryTimeFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("expireDelegationToken", calcDeadlineMs(now,
options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@@ -3371,9 +3367,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
ExpireDelegationTokenRequest.Builder createRequest(int timeoutMs) {
return new ExpireDelegationTokenRequest.Builder(
- new ExpireDelegationTokenRequestData()
- .setHmac(hmac)
-
.setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
+ new ExpireDelegationTokenRequestData()
+ .setHmac(hmac)
+ .setExpiryTimePeriodMs(options.expiryTimePeriodMs()));
}
@Override
@@ -3397,7 +3393,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DescribeDelegationTokenResult describeDelegationToken(final
DescribeDelegationTokenOptions options) {
- final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new
KafkaFutureImpl<>();
+ final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new
KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now,
options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@@ -3579,11 +3575,11 @@ public class KafkaAdminClient extends AdminClient {
public DescribeConsumerGroupsResult describeConsumerGroups(final
Collection<String> groupIds,
final
DescribeConsumerGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, ConsumerGroupDescription> future =
- DescribeConsumerGroupsHandler.newFuture(groupIds);
+ DescribeConsumerGroupsHandler.newFuture(groupIds);
DescribeConsumerGroupsHandler handler = new
DescribeConsumerGroupsHandler(options.includeAuthorizedOperations(),
logContext);
invokeDriver(handler, future, options.timeoutMs);
return new
DescribeConsumerGroupsResult(future.all().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
}
@Deprecated
@@ -3660,13 +3656,13 @@ public class KafkaAdminClient extends AdminClient {
@Override
ListGroupsRequest.Builder createRequest(int timeoutMs)
{
List<String> states = options.groupStates()
- .stream()
- .map(GroupState::toString)
- .collect(Collectors.toList());
+ .stream()
+ .map(GroupState::toString)
+ .collect(Collectors.toList());
List<String> groupTypes = options.types()
- .stream()
- .map(GroupType::toString)
- .collect(Collectors.toList());
+ .stream()
+ .map(GroupType::toString)
+ .collect(Collectors.toList());
return new ListGroupsRequest.Builder(new
ListGroupsRequestData()
.setStatesFilter(states)
.setTypesFilter(groupTypes)
@@ -3678,17 +3674,17 @@ public class KafkaAdminClient extends AdminClient {
if
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty())
{
final String groupId = group.groupId();
final Optional<GroupState> groupState =
group.groupState().isEmpty()
- ? Optional.empty()
- :
Optional.of(GroupState.parse(group.groupState()));
+ ? Optional.empty()
+ :
Optional.of(GroupState.parse(group.groupState()));
final Optional<GroupType> type =
group.groupType().isEmpty()
- ? Optional.empty()
- :
Optional.of(GroupType.parse(group.groupType()));
+ ? Optional.empty()
+ :
Optional.of(GroupType.parse(group.groupType()));
final ConsumerGroupListing groupListing = new
ConsumerGroupListing(
- groupId,
- groupState,
- type,
- protocolType.isEmpty()
- );
+ groupId,
+ groupState,
+ type,
+ protocolType.isEmpty()
+ );
results.addListing(groupListing);
}
}
@@ -3736,7 +3732,7 @@ public class KafkaAdminClient extends AdminClient {
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs,
ListConsumerGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> future =
- ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
ListConsumerGroupOffsetsHandler handler =
new ListConsumerGroupOffsetsHandler(groupSpecs,
options.requireStable(), logContext);
invokeDriver(handler, future, options.timeoutMs);
@@ -3745,7 +3741,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String,
ListStreamsGroupOffsetsSpec> groupSpecs,
-
ListStreamsGroupOffsetsOptions options) {
+
ListStreamsGroupOffsetsOptions options) {
Map<String, ListConsumerGroupOffsetsSpec> consumerGroupSpecs =
groupSpecs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
@@ -3760,11 +3756,11 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String>
groupIds, DeleteConsumerGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Void> future =
- DeleteConsumerGroupsHandler.newFuture(groupIds);
+ DeleteConsumerGroupsHandler.newFuture(groupIds);
DeleteConsumerGroupsHandler handler = new
DeleteConsumerGroupsHandler(logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteConsumerGroupsResult(future.all().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
}
@Override
@@ -3776,11 +3772,11 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
- String groupId,
- Set<TopicPartition> partitions,
- DeleteConsumerGroupOffsetsOptions options) {
+ String groupId,
+ Set<TopicPartition> partitions,
+ DeleteConsumerGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>>
future =
- DeleteConsumerGroupOffsetsHandler.newFuture(groupId);
+ DeleteConsumerGroupOffsetsHandler.newFuture(groupId);
DeleteConsumerGroupOffsetsHandler handler = new
DeleteConsumerGroupOffsetsHandler(groupId, partitions, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new
DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)),
partitions);
@@ -3800,11 +3796,11 @@ public class KafkaAdminClient extends AdminClient {
public DescribeShareGroupsResult describeShareGroups(final
Collection<String> groupIds,
final
DescribeShareGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, ShareGroupDescription> future =
- DescribeShareGroupsHandler.newFuture(groupIds);
+ DescribeShareGroupsHandler.newFuture(groupIds);
DescribeShareGroupsHandler handler = new
DescribeShareGroupsHandler(options.includeAuthorizedOperations(), logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DescribeShareGroupsResult(future.all().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
}
@Override
@@ -3871,13 +3867,13 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ElectLeadersResult electLeaders(
- final ElectionType electionType,
- final Set<TopicPartition> topicPartitions,
- ElectLeadersOptions options) {
+ final ElectionType electionType,
+ final Set<TopicPartition> topicPartitions,
+ ElectLeadersOptions options) {
final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>>
electionFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("electLeaders", calcDeadlineMs(now,
options.timeoutMs()),
- new ControllerNodeProvider()) {
+ new ControllerNodeProvider()) {
@Override
public ElectLeadersRequest.Builder createRequest(int timeoutMs) {
@@ -3910,8 +3906,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
public AlterPartitionReassignmentsResult alterPartitionReassignments(
- Map<TopicPartition, Optional<NewPartitionReassignment>>
reassignments,
- AlterPartitionReassignmentsOptions options) {
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
+ AlterPartitionReassignmentsOptions options) {
final Map<TopicPartition, KafkaFutureImpl<Void>> futures = new
HashMap<>();
final Map<String, Map<Integer, Optional<NewPartitionReassignment>>>
topicsToReassignments = new TreeMap<>();
for (Map.Entry<TopicPartition, Optional<NewPartitionReassignment>>
entry : reassignments.entrySet()) {
@@ -3924,13 +3920,13 @@ public class KafkaAdminClient extends AdminClient {
if (topicNameIsUnrepresentable(topic)) {
future.completeExceptionally(new InvalidTopicException("The
given topic name '" +
- topic + "' cannot be represented in a request."));
+ topic + "' cannot be represented in a request."));
} else if (topicPartition.partition() < 0) {
future.completeExceptionally(new InvalidTopicException("The
given partition index " +
- topicPartition.partition() + " is not valid."));
+ topicPartition.partition() + " is not valid."));
} else {
Map<Integer, Optional<NewPartitionReassignment>>
partitionReassignments =
- topicsToReassignments.get(topicPartition.topic());
+ topicsToReassignments.get(topicPartition.topic());
if (partitionReassignments == null) {
partitionReassignments = new TreeMap<>();
topicsToReassignments.put(topic, partitionReassignments);
@@ -3942,32 +3938,32 @@ public class KafkaAdminClient extends AdminClient {
final long now = time.milliseconds();
Call call = new Call("alterPartitionReassignments",
calcDeadlineMs(now, options.timeoutMs()),
- new ControllerNodeProvider(true)) {
+ new ControllerNodeProvider(true)) {
@Override
public AlterPartitionReassignmentsRequest.Builder
createRequest(int timeoutMs) {
AlterPartitionReassignmentsRequestData data =
- new AlterPartitionReassignmentsRequestData();
+ new AlterPartitionReassignmentsRequestData();
for (Map.Entry<String, Map<Integer,
Optional<NewPartitionReassignment>>> entry :
- topicsToReassignments.entrySet()) {
+ topicsToReassignments.entrySet()) {
String topicName = entry.getKey();
Map<Integer, Optional<NewPartitionReassignment>>
partitionsToReassignments = entry.getValue();
List<ReassignablePartition> reassignablePartitions = new
ArrayList<>();
for (Map.Entry<Integer,
Optional<NewPartitionReassignment>> partitionEntry :
- partitionsToReassignments.entrySet()) {
+ partitionsToReassignments.entrySet()) {
int partitionIndex = partitionEntry.getKey();
Optional<NewPartitionReassignment> reassignment =
partitionEntry.getValue();
ReassignablePartition reassignablePartition = new
ReassignablePartition()
- .setPartitionIndex(partitionIndex)
-
.setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null));
+ .setPartitionIndex(partitionIndex)
+
.setReplicas(reassignment.map(NewPartitionReassignment::targetReplicas).orElse(null));
reassignablePartitions.add(reassignablePartition);
}
ReassignableTopic reassignableTopic = new
ReassignableTopic()
- .setName(topicName)
- .setPartitions(reassignablePartitions);
+ .setName(topicName)
+ .setPartitions(reassignablePartitions);
data.topics().add(reassignableTopic);
}
data.setTimeoutMs(timeoutMs);
@@ -3994,8 +3990,8 @@ public class KafkaAdminClient extends AdminClient {
String topicName = topicResponse.name();
for (ReassignablePartitionResponse partition :
topicResponse.partitions()) {
errors.put(
- new TopicPartition(topicName,
partition.partitionIndex()),
- new ApiError(topLevelError,
response.data().errorMessage()).exception()
+ new TopicPartition(topicName,
partition.partitionIndex()),
+ new ApiError(topLevelError,
response.data().errorMessage()).exception()
);
receivedResponsesCount += 1;
}
@@ -4067,10 +4063,10 @@ public class KafkaAdminClient extends AdminClient {
int partition = tp.partition();
if (topicNameIsUnrepresentable(topic)) {
partitionReassignmentsFuture.completeExceptionally(new
InvalidTopicException("The given topic name '"
- + topic + "' cannot be represented in a
request."));
+ + topic + "' cannot be represented in a request."));
} else if (partition < 0) {
partitionReassignmentsFuture.completeExceptionally(new
InvalidTopicException("The given partition index " +
- partition + " is not valid."));
+ partition + " is not valid."));
}
if (partitionReassignmentsFuture.isCompletedExceptionally())
return new
ListPartitionReassignmentsResult(partitionReassignmentsFuture);
@@ -4159,7 +4155,7 @@ public class KafkaAdminClient extends AdminClient {
*/
private Integer nodeFor(ConfigResource resource) {
if ((resource.type() == ConfigResource.Type.BROKER &&
!resource.isDefault())
- || resource.type() == ConfigResource.Type.BROKER_LOGGER) {
+ || resource.type() == ConfigResource.Type.BROKER_LOGGER) {
return Integer.valueOf(resource.name());
} else {
return null;
@@ -4175,8 +4171,8 @@ public class KafkaAdminClient extends AdminClient {
} else {
List<MemberIdentity> membersToRemove =
res.members().stream().map(member ->
member.groupInstanceId().map(id -> new
MemberIdentity().setGroupInstanceId(id))
- .orElseGet(() -> new
MemberIdentity().setMemberId(member.consumerId()))
- .setReason(reason)
+ .orElseGet(() -> new
MemberIdentity().setMemberId(member.consumerId()))
+ .setReason(reason)
).collect(Collectors.toList());
future.complete(membersToRemove);
@@ -4209,7 +4205,7 @@ public class KafkaAdminClient extends AdminClient {
DEFAULT_LEAVE_GROUP_REASON :
JoinGroupRequest.maybeTruncateReason(options.reason());
final SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity,
Errors>> adminFuture =
- RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
+ RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
KafkaFutureImpl<List<MemberIdentity>> memFuture;
if (options.removeAll()) {
@@ -4217,8 +4213,8 @@ public class KafkaAdminClient extends AdminClient {
} else {
memFuture = new KafkaFutureImpl<>();
memFuture.complete(options.members().stream()
- .map(m -> m.toMemberIdentity().setReason(reason))
- .collect(Collectors.toList()));
+ .map(m -> m.toMemberIdentity().setReason(reason))
+ .collect(Collectors.toList()));
}
memFuture.whenComplete((members, ex) -> {
@@ -4240,7 +4236,7 @@ public class KafkaAdminClient extends AdminClient {
AlterConsumerGroupOffsetsOptions options
) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Errors>>
future =
- AlterConsumerGroupOffsetsHandler.newFuture(groupId);
+ AlterConsumerGroupOffsetsHandler.newFuture(groupId);
AlterConsumerGroupOffsetsHandler handler = new
AlterConsumerGroupOffsetsHandler(groupId, offsets, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new
AlterConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
@@ -4275,24 +4271,24 @@ public class KafkaAdminClient extends AdminClient {
final long now = time.milliseconds();
runnable.call(new Call("describeClientQuotas", calcDeadlineMs(now,
options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
+ new LeastLoadedNodeProvider()) {
- @Override
- DescribeClientQuotasRequest.Builder createRequest(int
timeoutMs) {
- return new DescribeClientQuotasRequest.Builder(filter);
- }
+ @Override
+ DescribeClientQuotasRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeClientQuotasRequest.Builder(filter);
+ }
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- DescribeClientQuotasResponse response =
(DescribeClientQuotasResponse) abstractResponse;
- response.complete(future);
- }
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeClientQuotasResponse response =
(DescribeClientQuotasResponse) abstractResponse;
+ response.complete(future);
+ }
- @Override
- void handleFailure(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
- }, now);
+ @Override
+ void handleFailure(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ }, now);
return new DescribeClientQuotasResult(future);
}
@@ -4306,24 +4302,24 @@ public class KafkaAdminClient extends AdminClient {
final long now = time.milliseconds();
runnable.call(new Call("alterClientQuotas", calcDeadlineMs(now,
options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
+ new LeastLoadedNodeProvider()) {
- @Override
- AlterClientQuotasRequest.Builder createRequest(int timeoutMs) {
- return new AlterClientQuotasRequest.Builder(entries,
options.validateOnly());
- }
+ @Override
+ AlterClientQuotasRequest.Builder createRequest(int timeoutMs) {
+ return new AlterClientQuotasRequest.Builder(entries,
options.validateOnly());
+ }
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- AlterClientQuotasResponse response =
(AlterClientQuotasResponse) abstractResponse;
- response.complete(futures);
- }
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ AlterClientQuotasResponse response =
(AlterClientQuotasResponse) abstractResponse;
+ response.complete(futures);
+ }
- @Override
- void handleFailure(Throwable throwable) {
- completeAllExceptionally(futures.values(), throwable);
- }
- }, now);
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(futures.values(), throwable);
+ }
+ }, now);
return new
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
}
@@ -4333,7 +4329,7 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<DescribeUserScramCredentialsResponseData>
dataFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
Call call = new Call("describeUserScramCredentials",
calcDeadlineMs(now, options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
+ new LeastLoadedNodeProvider()) {
@Override
public DescribeUserScramCredentialsRequest.Builder
createRequest(final int timeoutMs) {
final DescribeUserScramCredentialsRequestData requestData =
new DescribeUserScramCredentialsRequestData();
@@ -4379,7 +4375,7 @@ public class KafkaAdminClient extends AdminClient {
AlterUserScramCredentialsOptions options) {
final long now = time.milliseconds();
final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>();
- for (UserScramCredentialAlteration alteration: alterations) {
+ for (UserScramCredentialAlteration alteration : alterations) {
futures.put(alteration.user(), new KafkaFutureImpl<>());
}
final Map<String, Exception> userIllegalAlterationExceptions = new
HashMap<>();
@@ -4403,55 +4399,55 @@ public class KafkaAdminClient extends AdminClient {
// so keep track of which users are affected by such a failure so we
can fail all their alterations later
final Map<String, Map<ScramMechanism,
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions
= new HashMap<>();
alterations.stream().filter(a -> a instanceof
UserScramCredentialUpsertion)
- .filter(alteration ->
!userIllegalAlterationExceptions.containsKey(alteration.user()))
- .forEach(alteration -> {
- final String user = alteration.user();
- if (user == null || user.isEmpty()) {
- userIllegalAlterationExceptions.put(alteration.user(),
new UnacceptableCredentialException(usernameMustNotBeEmptyMsg));
- } else {
- UserScramCredentialUpsertion upsertion =
(UserScramCredentialUpsertion) alteration;
- try {
- byte[] password = upsertion.password();
- if (password == null || password.length == 0) {
- userIllegalAlterationExceptions.put(user, new
UnacceptableCredentialException(passwordMustNotBeEmptyMsg));
+ .filter(alteration ->
!userIllegalAlterationExceptions.containsKey(alteration.user()))
+ .forEach(alteration -> {
+ final String user = alteration.user();
+ if (user == null || user.isEmpty()) {
+ userIllegalAlterationExceptions.put(alteration.user(), new
UnacceptableCredentialException(usernameMustNotBeEmptyMsg));
+ } else {
+ UserScramCredentialUpsertion upsertion =
(UserScramCredentialUpsertion) alteration;
+ try {
+ byte[] password = upsertion.password();
+ if (password == null || password.length == 0) {
+ userIllegalAlterationExceptions.put(user, new
UnacceptableCredentialException(passwordMustNotBeEmptyMsg));
+ } else {
+ ScramMechanism mechanism =
upsertion.credentialInfo().mechanism();
+ if (mechanism == null || mechanism ==
ScramMechanism.UNKNOWN) {
+ userIllegalAlterationExceptions.put(user, new
UnsupportedSaslMechanismException(unknownScramMechanismMsg));
} else {
- ScramMechanism mechanism =
upsertion.credentialInfo().mechanism();
- if (mechanism == null || mechanism ==
ScramMechanism.UNKNOWN) {
- userIllegalAlterationExceptions.put(user,
new UnsupportedSaslMechanismException(unknownScramMechanismMsg));
- } else {
- userInsertions.putIfAbsent(user, new
HashMap<>());
- userInsertions.get(user).put(mechanism,
getScramCredentialUpsertion(upsertion));
- }
+ userInsertions.putIfAbsent(user, new
HashMap<>());
+ userInsertions.get(user).put(mechanism,
getScramCredentialUpsertion(upsertion));
}
- } catch (NoSuchAlgorithmException e) {
- // we might overwrite an exception from a previous
alteration, but we don't really care
- // since we just need to mark this user as having
at least one illegal alteration
- // and make an exception instance available for
completing the corresponding future exceptionally
- userIllegalAlterationExceptions.put(user, new
UnsupportedSaslMechanismException(unknownScramMechanismMsg));
- } catch (InvalidKeyException e) {
- // generally shouldn't happen since we deal with
the empty password case above,
- // but we still need to catch/handle it
- userIllegalAlterationExceptions.put(user, new
UnacceptableCredentialException(e.getMessage(), e));
}
+ } catch (NoSuchAlgorithmException e) {
+ // we might overwrite an exception from a previous
alteration, but we don't really care
+ // since we just need to mark this user as having at
least one illegal alteration
+ // and make an exception instance available for
completing the corresponding future exceptionally
+ userIllegalAlterationExceptions.put(user, new
UnsupportedSaslMechanismException(unknownScramMechanismMsg));
+ } catch (InvalidKeyException e) {
+ // generally shouldn't happen since we deal with the
empty password case above,
+ // but we still need to catch/handle it
+ userIllegalAlterationExceptions.put(user, new
UnacceptableCredentialException(e.getMessage(), e));
}
- });
+ }
+ });
// submit alterations only for users that do not have an illegal
alteration as identified above
Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now,
options.timeoutMs()),
- new ControllerNodeProvider()) {
+ new ControllerNodeProvider()) {
@Override
public AlterUserScramCredentialsRequest.Builder createRequest(int
timeoutMs) {
return new AlterUserScramCredentialsRequest.Builder(
- new
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
- .filter(a -> a instanceof
UserScramCredentialUpsertion)
- .filter(a ->
!userIllegalAlterationExceptions.containsKey(a.user()))
- .map(a ->
userInsertions.get(a.user()).get(((UserScramCredentialUpsertion)
a).credentialInfo().mechanism()))
- .collect(Collectors.toList()))
+ new
AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream()
+ .filter(a -> a instanceof
UserScramCredentialUpsertion)
+ .filter(a ->
!userIllegalAlterationExceptions.containsKey(a.user()))
+ .map(a ->
userInsertions.get(a.user()).get(((UserScramCredentialUpsertion)
a).credentialInfo().mechanism()))
+ .collect(Collectors.toList()))
.setDeletions(alterations.stream()
- .filter(a -> a instanceof
UserScramCredentialDeletion)
- .filter(a ->
!userIllegalAlterationExceptions.containsKey(a.user()))
- .map(d ->
getScramCredentialDeletion((UserScramCredentialDeletion) d))
- .collect(Collectors.toList())));
+ .filter(a -> a instanceof
UserScramCredentialDeletion)
+ .filter(a ->
!userIllegalAlterationExceptions.containsKey(a.user()))
+ .map(d ->
getScramCredentialDeletion((UserScramCredentialDeletion) d))
+ .collect(Collectors.toList())));
}
@Override
@@ -4501,10 +4497,10 @@ public class KafkaAdminClient extends AdminClient {
private static
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion
getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws
InvalidKeyException, NoSuchAlgorithmException {
AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval =
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion();
return retval.setName(u.user())
- .setMechanism(u.credentialInfo().mechanism().type())
- .setIterations(u.credentialInfo().iterations())
- .setSalt(u.salt())
-
.setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(),
u.password(), u.salt(), u.credentialInfo().iterations()));
+ .setMechanism(u.credentialInfo().mechanism().type())
+ .setIterations(u.credentialInfo().iterations())
+ .setSalt(u.salt())
+
.setSaltedPassword(getSaltedPassword(u.credentialInfo().mechanism(),
u.password(), u.salt(), u.credentialInfo().iterations()));
}
private static
AlterUserScramCredentialsRequestData.ScramCredentialDeletion
getScramCredentialDeletion(UserScramCredentialDeletion d) {
@@ -4513,7 +4509,7 @@ public class KafkaAdminClient extends AdminClient {
private static byte[] getSaltedPassword(ScramMechanism
publicScramMechanism, byte[] password, byte[] salt, int iterations) throws
NoSuchAlgorithmException, InvalidKeyException {
return new
ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.mechanismName()))
- .hi(password, salt, iterations);
+ .hi(password, salt, iterations);
}
@Override
@@ -4639,7 +4635,7 @@ public class KafkaAdminClient extends AdminClient {
}
// The server should send back a response for
every feature, but we do a sanity check anyway.
completeUnrealizedFutures(updateFutures.entrySet().stream(),
- feature -> "The controller response did
not contain a result for feature " + feature);
+ feature -> "The controller response did not
contain a result for feature " + feature);
}
break;
case NOT_CONTROLLER:
@@ -4670,15 +4666,15 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<QuorumInfo> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
- "describeMetadataQuorum", calcDeadlineMs(now,
options.timeoutMs()), provider) {
+ "describeMetadataQuorum", calcDeadlineMs(now,
options.timeoutMs()), provider) {
private QuorumInfo.ReplicaState
translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) {
return new QuorumInfo.ReplicaState(
- replica.replicaId(),
- replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID
: replica.replicaDirectoryId(),
- replica.logEndOffset(),
- replica.lastFetchTimestamp() == -1 ?
OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()),
- replica.lastCaughtUpTimestamp() == -1 ?
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
+ replica.replicaId(),
+ replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID :
replica.replicaDirectoryId(),
+ replica.logEndOffset(),
+ replica.lastFetchTimestamp() == -1 ? OptionalLong.empty()
: OptionalLong.of(replica.lastFetchTimestamp()),
+ replica.lastCaughtUpTimestamp() == -1 ?
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
}
private QuorumInfo createQuorumResult(final
DescribeQuorumResponseData.PartitionData partition,
DescribeQuorumResponseData.NodeCollection nodeCollection) {
@@ -4711,7 +4707,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
return new Builder(DescribeQuorumRequest.singletonRequest(
- new TopicPartition(CLUSTER_METADATA_TOPIC_NAME,
CLUSTER_METADATA_TOPIC_PARTITION.partition())));
+ new TopicPartition(CLUSTER_METADATA_TOPIC_NAME,
CLUSTER_METADATA_TOPIC_PARTITION.partition())));
}
@Override
@@ -4723,27 +4719,27 @@ public class KafkaAdminClient extends AdminClient {
}
if (quorumResponse.data().topics().size() != 1) {
String msg = String.format("DescribeMetadataQuorum
received %d topics when 1 was expected",
- quorumResponse.data().topics().size());
+ quorumResponse.data().topics().size());
log.debug(msg);
throw new UnknownServerException(msg);
}
DescribeQuorumResponseData.TopicData topic =
quorumResponse.data().topics().get(0);
if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) {
String msg = String.format("DescribeMetadataQuorum
received a topic with name %s when %s was expected",
- topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
+ topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
log.debug(msg);
throw new UnknownServerException(msg);
}
if (topic.partitions().size() != 1) {
String msg = String.format("DescribeMetadataQuorum
received a topic %s with %d partitions when 1 was expected",
- topic.topicName(), topic.partitions().size());
+ topic.topicName(), topic.partitions().size());
log.debug(msg);
throw new UnknownServerException(msg);
}
DescribeQuorumResponseData.PartitionData partition =
topic.partitions().get(0);
if (partition.partitionIndex() !=
CLUSTER_METADATA_TOPIC_PARTITION.partition()) {
String msg = String.format("DescribeMetadataQuorum
received a single partition with index %d when %d was expected",
- partition.partitionIndex(),
CLUSTER_METADATA_TOPIC_PARTITION.partition());
+ partition.partitionIndex(),
CLUSTER_METADATA_TOPIC_PARTITION.partition());
log.debug(msg);
throw new UnknownServerException(msg);
}
@@ -4768,19 +4764,19 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("unregisterBroker", calcDeadlineMs(now,
options.timeoutMs()),
- new LeastLoadedBrokerOrActiveKController()) {
+ new LeastLoadedBrokerOrActiveKController()) {
@Override
UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {
UnregisterBrokerRequestData data =
- new
UnregisterBrokerRequestData().setBrokerId(brokerId);
+ new UnregisterBrokerRequestData().setBrokerId(brokerId);
return new UnregisterBrokerRequest.Builder(data);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final UnregisterBrokerResponse response =
- (UnregisterBrokerResponse) abstractResponse;
+ (UnregisterBrokerResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
switch (error) {
case NONE:
@@ -4840,7 +4836,7 @@ public class KafkaAdminClient extends AdminClient {
* where a coordinator may need to unilaterally terminate a participant
transaction that hasn't completed.
* </p>
*
- * @param transactionalId The transactional ID whose active
transaction should be forcefully terminated.
+ * @param transactionalId The transactional ID whose active transaction
should be forcefully terminated.
* @return a {@link TerminateTransactionResult} that can be used to await
the operation result.
*/
@Override
@@ -4879,6 +4875,45 @@ public class KafkaAdminClient extends AdminClient {
return new FenceProducersResult(future.all());
}
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ final long now = time.milliseconds();
+ final KafkaFutureImpl<Collection<ConfigResource>> future = new
KafkaFutureImpl<>();
+ final Call call = new Call("listConfigResources", calcDeadlineMs(now,
options.timeoutMs()), new LeastLoadedNodeProvider()) {
+
+ @Override
+ ListConfigResourcesRequest.Builder createRequest(int timeoutMs) {
+ return new ListConfigResourcesRequest.Builder(
+ new ListConfigResourcesRequestData()
+ .setResourceTypes(
+ configResourceTypes
+ .stream()
+ .map(ConfigResource.Type::id)
+ .collect(Collectors.toList())
+ )
+ );
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ ListConfigResourcesResponse response =
(ListConfigResourcesResponse) abstractResponse;
+ if (response.error().isFailure()) {
+ future.completeExceptionally(response.error().exception());
+ } else {
+ future.complete(response.configResources());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ };
+ runnable.call(call, now);
+ return new ListConfigResourcesResult(future);
+ }
+
+ @SuppressWarnings({"deprecation", "removal"})
@Override
public ListClientMetricsResourcesResult
listClientMetricsResources(ListClientMetricsResourcesOptions options) {
final long now = time.milliseconds();
@@ -4900,7 +4935,13 @@ public class KafkaAdminClient extends AdminClient {
if (response.error().isFailure()) {
future.completeExceptionally(response.error().exception());
} else {
- future.complete(response.clientMetricsResources());
+ future.complete(response
+ .data()
+ .configResources()
+ .stream()
+ .filter(entry -> entry.resourceType() ==
ConfigResource.Type.CLIENT_METRICS.id())
+ .map(entry -> new
ClientMetricsResourceListing(entry.resourceName()))
+ .collect(Collectors.toList()));
}
}
@@ -4924,7 +4965,7 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
- "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()),
provider) {
+ "addRaftVoter", calcDeadlineMs(now, options.timeoutMs()),
provider) {
@Override
AddRaftVoterRequest.Builder createRequest(int timeoutMs) {
@@ -4936,12 +4977,12 @@ public class KafkaAdminClient extends AdminClient {
setHost(endpoint.host()).
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
- new AddRaftVoterRequestData().
- setClusterId(options.clusterId().orElse(null)).
- setTimeoutMs(timeoutMs).
- setVoterId(voterId) .
- setVoterDirectoryId(voterDirectoryId).
- setListeners(listeners));
+ new AddRaftVoterRequestData().
+ setClusterId(options.clusterId().orElse(null)).
+ setTimeoutMs(timeoutMs).
+ setVoterId(voterId).
+ setVoterDirectoryId(voterDirectoryId).
+ setListeners(listeners));
}
@Override
@@ -4978,14 +5019,14 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
- "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()),
provider) {
+ "removeRaftVoter", calcDeadlineMs(now, options.timeoutMs()),
provider) {
@Override
RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) {
return new RemoveRaftVoterRequest.Builder(
new RemoveRaftVoterRequestData().
setClusterId(options.clusterId().orElse(null)).
- setVoterId(voterId) .
+ setVoterId(voterId).
setVoterDirectoryId(voterDirectoryId));
}
@@ -4995,8 +5036,8 @@ public class KafkaAdminClient extends AdminClient {
RemoveRaftVoterResponse addResponse =
(RemoveRaftVoterResponse) response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
- addResponse.data().errorCode(),
- addResponse.data().errorMessage());
+ addResponse.data().errorCode(),
+ addResponse.data().errorMessage());
future.completeExceptionally(error.exception());
} else {
future.complete(null);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
index 7b6dbf302c6..f90778db12c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
/**
* Options for {@link Admin#listClientMetricsResources()}.
+ * @deprecated Since 4.1. Use {@link ListConfigResourcesOptions} instead.
*/
+@Deprecated(since = "4.1")
public class ListClientMetricsResourcesOptions extends
AbstractOptions<ListClientMetricsResourcesOptions> {
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
index 4a63e31c238..a4d0ed3cecb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
@@ -25,7 +25,9 @@ import java.util.Collection;
/**
* The result of the {@link Admin#listClientMetricsResources()} call.
* <p>
+ * @deprecated Since 4.1. Use {@link ListConfigResourcesResult} instead.
*/
+@Deprecated(since = "4.1")
public class ListClientMetricsResourcesResult {
private final KafkaFuture<Collection<ClientMetricsResourceListing>> future;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java
similarity index 83%
copy from
clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java
index 7b6dbf302c6..dbd8581c795 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesOptions.java
@@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.clients.admin;
/**
- * Options for {@link Admin#listClientMetricsResources()}.
+ * Options for {@link Admin#listConfigResources()}.
*/
-public class ListClientMetricsResourcesOptions extends
AbstractOptions<ListClientMetricsResourcesOptions> {
+public class ListConfigResourcesOptions extends
AbstractOptions<ListConfigResourcesOptions> {
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java
similarity index 69%
copy from
clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java
index 4a63e31c238..fa9ad46a72c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConfigResourcesResult.java
@@ -14,39 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Collection;
/**
- * The result of the {@link Admin#listClientMetricsResources()} call.
+ * The result of the {@link Admin#listConfigResources()} call.
* <p>
*/
-public class ListClientMetricsResourcesResult {
- private final KafkaFuture<Collection<ClientMetricsResourceListing>> future;
+public class ListConfigResourcesResult {
+ private final KafkaFuture<Collection<ConfigResource>> future;
-
ListClientMetricsResourcesResult(KafkaFuture<Collection<ClientMetricsResourceListing>>
future) {
+ ListConfigResourcesResult(KafkaFuture<Collection<ConfigResource>> future) {
this.future = future;
}
/**
- * Returns a future that yields either an exception, or the full set of
client metrics
- * listings.
+ * Returns a future that yields either an exception, or the full set of
config resources.
*
* In the event of a failure, the future yields nothing but the first
exception which
* occurred.
*/
- public KafkaFuture<Collection<ClientMetricsResourceListing>> all() {
- final KafkaFutureImpl<Collection<ClientMetricsResourceListing>> result
= new KafkaFutureImpl<>();
- future.whenComplete((listings, throwable) -> {
+ public KafkaFuture<Collection<ConfigResource>> all() {
+ final KafkaFutureImpl<Collection<ConfigResource>> result = new
KafkaFutureImpl<>();
+ future.whenComplete((resources, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
- result.complete(listings);
+ result.complete(resources);
}
});
return result;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
index 3af70938843..436d08c4909 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesRequest.java
@@ -17,12 +17,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListConfigResourcesRequestData;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
+import java.util.HashSet;
import java.util.Set;
public class ListConfigResourcesRequest extends AbstractRequest {
@@ -36,6 +38,15 @@ public class ListConfigResourcesRequest extends
AbstractRequest {
@Override
public ListConfigResourcesRequest build(short version) {
+ if (version == 0) {
+ // The v0 only supports CLIENT_METRICS resource type.
+ Set<Byte> resourceTypes = new HashSet<>(data.resourceTypes());
+ if (resourceTypes.size() != 1 ||
!resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id())) {
+ throw new UnsupportedVersionException("The v0
ListConfigResources only supports CLIENT_METRICS");
+ }
+ // The v0 request does not have resource types field, so
creating a new request data.
+ return new ListConfigResourcesRequest(new
ListConfigResourcesRequestData(), version);
+ }
return new ListConfigResourcesRequest(data, version);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
index 36a4a807f7f..f9fa50d02a9 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListConfigResourcesResponse.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -78,12 +77,4 @@ public class ListConfigResourcesResponse extends
AbstractResponse {
)
).collect(Collectors.toList());
}
-
- public Collection<ClientMetricsResourceListing> clientMetricsResources() {
- return data.configResources()
- .stream()
- .filter(entry -> entry.resourceType() ==
ConfigResource.Type.CLIENT_METRICS.id())
- .map(entry -> new
ClientMetricsResourceListing(entry.resourceName()))
- .collect(Collectors.toList());
- }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index c98ffb9483f..36e7571d8dd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -163,17 +163,17 @@ public class AdminClientTestUtils {
return new
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
future));
}
- public static ListClientMetricsResourcesResult
listClientMetricsResourcesResult(String... names) {
- return new ListClientMetricsResourcesResult(
- KafkaFuture.completedFuture(Arrays.stream(names)
- .map(ClientMetricsResourceListing::new)
- .collect(Collectors.toList())));
+ public static ListConfigResourcesResult
listConfigResourcesResult(String... names) {
+ return new ListConfigResourcesResult(
+ KafkaFuture.completedFuture(Arrays.stream(names)
+ .map(name -> new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, name))
+ .collect(Collectors.toList())));
}
- public static ListClientMetricsResourcesResult
listClientMetricsResourcesResult(KafkaException exception) {
- final KafkaFutureImpl<Collection<ClientMetricsResourceListing>> future
= new KafkaFutureImpl<>();
+ public static ListConfigResourcesResult
listConfigResourcesResult(KafkaException exception) {
+ final KafkaFutureImpl<Collection<ConfigResource>> future = new
KafkaFutureImpl<>();
future.completeExceptionally(exception);
- return new ListClientMetricsResourcesResult(future);
+ return new ListConfigResourcesResult(future);
}
public static ListShareGroupOffsetsResult
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> groupOffsets) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 83356b68ff5..d7139be1698 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -10664,6 +10664,7 @@ public class KafkaAdminClientTest {
member.memberEpoch());
}
+ @SuppressWarnings({"deprecation", "removal"})
@Test
public void testListClientMetricsResources() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -10697,6 +10698,7 @@ public class KafkaAdminClientTest {
}
}
+ @SuppressWarnings({"deprecation", "removal"})
@Test
public void testListClientMetricsResourcesEmpty() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -10714,6 +10716,7 @@ public class KafkaAdminClientTest {
}
}
+ @SuppressWarnings({"deprecation", "removal"})
@Test
public void testListClientMetricsResourcesNotSupported() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -10729,6 +10732,70 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListConfigResources() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ List<ConfigResource> expected = List.of(
+ new ConfigResource(ConfigResource.Type.CLIENT_METRICS,
"client-metrics"),
+ new ConfigResource(ConfigResource.Type.BROKER, "1"),
+ new ConfigResource(ConfigResource.Type.BROKER_LOGGER, "1"),
+ new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
+ new ConfigResource(ConfigResource.Type.GROUP, "group")
+ );
+
+ ListConfigResourcesResponseData responseData =
+ new
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
+
+ expected.forEach(c ->
+ responseData.configResources()
+ .add(new ListConfigResourcesResponseData
+ .ConfigResource()
+ .setResourceName(c.name())
+ .setResourceType(c.type().id())
+ )
+ );
+
+ env.kafkaClient().prepareResponse(
+ request -> request instanceof ListConfigResourcesRequest,
+ new ListConfigResourcesResponse(responseData));
+
+ ListConfigResourcesResult result =
env.adminClient().listConfigResources();
+ assertEquals(expected.size(), result.all().get().size());
+ assertEquals(new HashSet<>(expected), new
HashSet<>(result.all().get()));
+ }
+ }
+
+ @Test
+ public void testListConfigResourcesEmpty() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ ListConfigResourcesResponseData responseData =
+ new
ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
+
+ env.kafkaClient().prepareResponse(
+ request -> request instanceof ListConfigResourcesRequest,
+ new ListConfigResourcesResponse(responseData));
+
+ ListConfigResourcesResult result =
env.adminClient().listConfigResources();
+ assertTrue(result.all().get().isEmpty());
+ }
+ }
+
+ @Test
+ public void testListConfigResourcesNotSupported() {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().prepareResponse(
+ request -> request instanceof ListConfigResourcesRequest,
+ new ListConfigResourcesResponse(new
ListConfigResourcesResponseData()
+ .setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
+
+ ListConfigResourcesResult result =
env.adminClient().listConfigResources(
+ Set.of(ConfigResource.Type.UNKNOWN), new
ListConfigResourcesOptions());
+
+ assertNotNull(result.all());
+ TestUtils.assertFutureThrows(UnsupportedVersionException.class,
result.all());
+ }
+ }
+
@Test
public void
testCallFailWithUnsupportedVersionExceptionDoesNotHaveConcurrentModificationException()
throws InterruptedException {
Cluster cluster = mockCluster(1, 0);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 02a9e628b7e..37e18e15983 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1394,6 +1394,12 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public ListConfigResourcesResult
listConfigResources(Set<ConfigResource.Type> configResourceTypes,
ListConfigResourcesOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @SuppressWarnings("deprecation")
@Override
public ListClientMetricsResourcesResult
listClientMetricsResources(ListClientMetricsResourcesOptions options) {
KafkaFutureImpl<Collection<ClientMetricsResourceListing>> future = new
KafkaFutureImpl<>();
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index aa50f9db018..7a3be68ff78 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -3637,7 +3637,10 @@ public class RequestResponseTest {
}
private ListConfigResourcesRequest createListConfigResourcesRequest(short
version) {
- return new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build(version);
+ return version == 0 ?
+ new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()
+
.setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))).build(version)
:
+ new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build(version);
}
private ListConfigResourcesResponse createListConfigResourcesResponse() {
@@ -3951,4 +3954,25 @@ public class RequestResponseTest {
parseRequest(SASL_AUTHENTICATE,
SASL_AUTHENTICATE.latestVersion(), accessor)).getMessage();
assertEquals("Error reading byte array of 32767 byte(s): only 3
byte(s) available", msg);
}
+
+ @Test
+ public void
testListConfigResourcesRequestV0FailsWithConfigResourceTypeOtherThanClientMetrics()
{
+ // One type which is not CLIENT_METRICS
+ Arrays.stream(ConfigResource.Type.values())
+ .filter(t -> t != ConfigResource.Type.CLIENT_METRICS)
+ .forEach(t -> {
+ ListConfigResourcesRequestData data = new
ListConfigResourcesRequestData()
+ .setResourceTypes(List.of(t.id()));
+ assertThrows(UnsupportedVersionException.class, () -> new
ListConfigResourcesRequest.Builder(data).build((short) 0));
+ });
+
+ // Multiple types with CLIENT_METRICS
+ Arrays.stream(ConfigResource.Type.values())
+ .filter(t -> t != ConfigResource.Type.CLIENT_METRICS)
+ .forEach(t -> {
+ ListConfigResourcesRequestData data = new
ListConfigResourcesRequestData()
+ .setResourceTypes(List.of(t.id(),
ConfigResource.Type.CLIENT_METRICS.id()));
+ assertThrows(UnsupportedVersionException.class, () -> new
ListConfigResourcesRequest.Builder(data).build((short) 0));
+ });
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 423e869ffd0..be0c9dea17b 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -3866,6 +3866,92 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
} finally client.close(time.Duration.ZERO)
}
+ @Test
+ def testListConfigResources(): Unit = {
+ client = createAdminClient
+
+ // Alter group and client metric config to add group and client metric
config resource
+ val clientMetric = "client-metrics"
+ val group = "group"
+ val clientMetricResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetric)
+ val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+ val alterResult = client.incrementalAlterConfigs(util.Map.of(
+ clientMetricResource,
+ util.Set.of(new AlterConfigOp(new ConfigEntry("interval.ms", "111"),
AlterConfigOp.OpType.SET)),
+ groupResource,
+ util.Set.of(new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"),
AlterConfigOp.OpType.SET))
+ ))
+ assertEquals(util.Set.of(clientMetricResource, groupResource),
alterResult.values.keySet)
+ alterResult.all.get(15, TimeUnit.SECONDS)
+
+ ensureConsistentKRaftMetadata()
+
+ // non-specified config resource type retrieves all config resources
+ var configResources = client.listConfigResources().all().get()
+ assertEquals(9, configResources.size())
+ brokerServers.foreach(b => {
+ assertTrue(configResources.contains(new
ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
+ assertTrue(configResources.contains(new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
+ })
+ assertTrue(configResources.contains(new
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+ assertTrue(configResources.contains(groupResource))
+ assertTrue(configResources.contains(clientMetricResource))
+
+ // BROKER config resource type retrieves only broker config resources
+ configResources =
client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER), new
ListConfigResourcesOptions()).all().get()
+ assertEquals(3, configResources.size())
+ brokerServers.foreach(b => {
+ assertTrue(configResources.contains(new
ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
+ assertFalse(configResources.contains(new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
+ })
+ assertFalse(configResources.contains(new
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+ assertFalse(configResources.contains(groupResource))
+ assertFalse(configResources.contains(clientMetricResource))
+
+ // BROKER_LOGGER config resource type retrieves only broker logger config
resources
+ configResources =
client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER_LOGGER), new
ListConfigResourcesOptions()).all().get()
+ assertEquals(3, configResources.size())
+ brokerServers.foreach(b => {
+ assertFalse(configResources.contains(new
ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
+ assertTrue(configResources.contains(new
ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
+ })
+ assertFalse(configResources.contains(new
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+ assertFalse(configResources.contains(groupResource))
+ assertFalse(configResources.contains(clientMetricResource))
+
+ // TOPIC config resource type retrieves only topic config resources
+ configResources =
client.listConfigResources(util.Set.of(ConfigResource.Type.TOPIC), new
ListConfigResourcesOptions()).all().get()
+ assertEquals(1, configResources.size())
+ assertTrue(configResources.contains(new
ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
+
+ // GROUP config resource type retrieves only group config resources
+ configResources =
client.listConfigResources(util.Set.of(ConfigResource.Type.GROUP), new
ListConfigResourcesOptions()).all().get()
+ assertEquals(1, configResources.size())
+ assertTrue(configResources.contains(groupResource))
+
+ // CLIENT_METRICS config resource type retrieves only client metric config
resources
+ configResources =
client.listConfigResources(util.Set.of(ConfigResource.Type.CLIENT_METRICS), new
ListConfigResourcesOptions()).all().get()
+ assertEquals(1, configResources.size())
+ assertTrue(configResources.contains(clientMetricResource))
+
+ // UNKNOWN config resource type gets UNSUPPORTED_VERSION error
+ assertThrows(classOf[ExecutionException], () => {
+ client.listConfigResources(util.Set.of(ConfigResource.Type.UNKNOWN), new
ListConfigResourcesOptions()).all().get()
+ })
+ }
+
+ @Test
+ @Timeout(30)
+ def testListConfigResourcesTimeoutMs(): Unit = {
+ client = createInvalidAdminClient()
+ try {
+ val timeoutOption = new ListConfigResourcesOptions().timeoutMs(0)
+ val exception = assertThrows(classOf[ExecutionException], () =>
+ client.listConfigResources(util.Set.of(), timeoutOption).all().get())
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ } finally client.close(time.Duration.ZERO)
+ }
+
/**
* Test that createTopics returns the dynamic configurations of the topics
that were created.
*
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f698c8ef9ef..93d138e8946 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11207,7 +11207,8 @@ class KafkaApisTest extends Logging {
@Test
def testListConfigResourcesV0(): Unit = {
- val request = buildRequest(new ListConfigResourcesRequest.Builder(new
ListConfigResourcesRequestData()).build(0))
+ val request = buildRequest(new ListConfigResourcesRequest.Builder(
+ new
ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0))
metadataCache = mock(classOf[KRaftMetadataCache])
val resources = util.Set.of("client-metric1", "client-metric2")
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 91871133d68..ae8e9f0924c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -110,6 +110,8 @@ import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.FenceProducersResult;
import org.apache.kafka.clients.admin.ListClientMetricsResourcesOptions;
import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
+import org.apache.kafka.clients.admin.ListConfigResourcesResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
@@ -436,6 +438,12 @@ public class TestingMetricsInterceptingAdminClient extends
AdminClient {
return adminDelegate.fenceProducers(transactionalIds, options);
}
+ @Override
+ public ListConfigResourcesResult listConfigResources(final
Set<ConfigResource.Type> configResourceTypes, final ListConfigResourcesOptions
options) {
+ return adminDelegate.listConfigResources(configResourceTypes, options);
+ }
+
+ @SuppressWarnings({"deprecation", "removal"})
@Override
public ListClientMetricsResourcesResult listClientMetricsResources(final
ListClientMetricsResourcesOptions options) {
return adminDelegate.listClientMetricsResources(options);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
index f9b02f78397..8dcb5e5a750 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
@@ -21,9 +21,9 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
-import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Exit;
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -157,9 +158,11 @@ public class ClientMetricsCommand {
if (entityNameOpt.isPresent()) {
entities = Collections.singletonList(entityNameOpt.get());
} else {
- Collection<ClientMetricsResourceListing> resources =
adminClient.listClientMetricsResources()
- .all().get(30, TimeUnit.SECONDS);
- entities =
resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.toList());
+ Collection<ConfigResource> resources = adminClient
+
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new
ListConfigResourcesOptions())
+ .all()
+ .get(30, TimeUnit.SECONDS);
+ entities =
resources.stream().map(ConfigResource::name).toList();
}
for (String entity : entities) {
@@ -170,9 +173,11 @@ public class ClientMetricsCommand {
}
public void listClientMetrics() throws Exception {
- Collection<ClientMetricsResourceListing> resources =
adminClient.listClientMetricsResources()
- .all().get(30, TimeUnit.SECONDS);
- String results =
resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.joining("\n"));
+ Collection<ConfigResource> resources = adminClient
+
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new
ListConfigResourcesOptions())
+ .all()
+ .get(30, TimeUnit.SECONDS);
+ String results =
resources.stream().map(ConfigResource::name).collect(Collectors.joining("\n"));
System.out.println(results);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
index c58748bf3c0..2fcf082f0a0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
-import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
+import org.apache.kafka.clients.admin.ListConfigResourcesResult;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
@@ -254,8 +254,8 @@ public class ClientMetricsCommandTest {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new
ClientMetricsCommand.ClientMetricsService(adminClient);
- ListClientMetricsResourcesResult result =
AdminClientTestUtils.listClientMetricsResourcesResult(clientMetricsName);
- when(adminClient.listClientMetricsResources()).thenReturn(result);
+ ListConfigResourcesResult result =
AdminClientTestUtils.listConfigResourcesResult(clientMetricsName);
+ when(adminClient.listConfigResources(any(), any())).thenReturn(result);
ConfigResource cr = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName);
Config cfg = new Config(Collections.singleton(new
ConfigEntry("metrics", "org.apache.kafka.producer.")));
DescribeConfigsResult describeResult =
AdminClientTestUtils.describeConfigsResult(cr, cfg);
@@ -278,8 +278,8 @@ public class ClientMetricsCommandTest {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new
ClientMetricsCommand.ClientMetricsService(adminClient);
- ListClientMetricsResourcesResult result =
AdminClientTestUtils.listClientMetricsResourcesResult("one", "two");
- when(adminClient.listClientMetricsResources()).thenReturn(result);
+ ListConfigResourcesResult result =
AdminClientTestUtils.listConfigResourcesResult("one", "two");
+ when(adminClient.listConfigResources(any(), any())).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
@@ -296,8 +296,8 @@ public class ClientMetricsCommandTest {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new
ClientMetricsCommand.ClientMetricsService(adminClient);
- ListClientMetricsResourcesResult result =
AdminClientTestUtils.listClientMetricsResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
- when(adminClient.listClientMetricsResources()).thenReturn(result);
+ ListConfigResourcesResult result =
AdminClientTestUtils.listConfigResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
+ when(adminClient.listConfigResources(any(), any())).thenReturn(result);
assertThrows(ExecutionException.class, () ->
service.listClientMetrics());
}