mimaison commented on a change in pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#discussion_r438708511



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1916,129 +1917,96 @@ void handleFailure(Throwable throwable) {
 
     @Override
     public DescribeConfigsResult describeConfigs(Collection<ConfigResource> 
configResources, final DescribeConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Config>> 
unifiedRequestFutures = new HashMap<>();
-        final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new 
HashMap<>(configResources.size());
-
-        // The BROKER resources which we want to describe.  We must make a 
separate DescribeConfigs
-        // request for every BROKER resource we want to describe.
-        final Collection<ConfigResource> brokerResources = new ArrayList<>();
-
-        // The non-BROKER resources which we want to describe.  These 
resources can be described by a
-        // single, unified DescribeConfigs request.
-        final Collection<ConfigResource> unifiedRequestResources = new 
ArrayList<>(configResources.size());
+        // Partition the requested config resources based on which broker they 
must be sent to with the
+        // null broker being used for config resources which can be obtained 
from any broker
+        final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> 
brokerFutures = new HashMap<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
-            if (dependsOnSpecificNode(resource)) {
-                brokerFutures.put(resource, new KafkaFutureImpl<>());
-                brokerResources.add(resource);
-            } else {
-                unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-                unifiedRequestResources.add(resource);
-            }
+            Integer broker = nodeFor(resource);
+            brokerFutures.compute(broker, (key, value) -> {
+                if (value == null) {
+                    // Only BROKER and BROKER_LOGGER configs are 
broker-specific
+                    value = new HashMap<>(broker != null ? 2 : 16);

Review comment:
       I wonder if we can just get a Map with the default size. I don't expect 
this code path to be very hot

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1916,129 +1917,96 @@ void handleFailure(Throwable throwable) {
 
     @Override
     public DescribeConfigsResult describeConfigs(Collection<ConfigResource> 
configResources, final DescribeConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Config>> 
unifiedRequestFutures = new HashMap<>();
-        final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new 
HashMap<>(configResources.size());
-
-        // The BROKER resources which we want to describe.  We must make a 
separate DescribeConfigs
-        // request for every BROKER resource we want to describe.
-        final Collection<ConfigResource> brokerResources = new ArrayList<>();
-
-        // The non-BROKER resources which we want to describe.  These 
resources can be described by a
-        // single, unified DescribeConfigs request.
-        final Collection<ConfigResource> unifiedRequestResources = new 
ArrayList<>(configResources.size());
+        // Partition the requested config resources based on which broker they 
must be sent to with the
+        // null broker being used for config resources which can be obtained 
from any broker
+        final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> 
brokerFutures = new HashMap<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
-            if (dependsOnSpecificNode(resource)) {
-                brokerFutures.put(resource, new KafkaFutureImpl<>());
-                brokerResources.add(resource);
-            } else {
-                unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-                unifiedRequestResources.add(resource);
-            }
+            Integer broker = nodeFor(resource);
+            brokerFutures.compute(broker, (key, value) -> {
+                if (value == null) {
+                    // Only BROKER and BROKER_LOGGER configs are 
broker-specific
+                    value = new HashMap<>(broker != null ? 2 : 16);
+                }
+                value.put(resource, new KafkaFutureImpl<>());
+                return value;
+            });
         }
 
         final long now = time.milliseconds();
-        if (!unifiedRequestResources.isEmpty()) {
+        for (Map.Entry<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> 
entry : brokerFutures.entrySet()) {
+            Integer broker = entry.getKey();
+            Map<ConfigResource, KafkaFutureImpl<Config>> unified = 
entry.getValue();
+
             runnable.call(new Call("describeConfigs", calcDeadlineMs(now, 
options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+                broker != null ? new ConstantNodeIdProvider(broker) : new 
LeastLoadedNodeProvider()) {
 
                 @Override
                 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-                    return new 
DescribeConfigsRequest.Builder(unifiedRequestResources)
-                            .includeSynonyms(options.includeSynonyms())
-                            
.includeDocumentation(options.includeDocumentation());
+                    return new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
+                        .setResources(unified.keySet().stream()
+                            .map(config ->
+                                new 
DescribeConfigsRequestData.DescribeConfigsResource()
+                                    .setResourceName(config.name())
+                                    .setResourceType(config.type().id()))
+                            .collect(Collectors.toList()))
+                        .setIncludeSynonyms(options.includeSynonyms())
+                        
.setIncludeDocumentation(options.includeDocumentation()));
                 }
 
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
                     DescribeConfigsResponse response = 
(DescribeConfigsResponse) abstractResponse;
-                    for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> 
entry : unifiedRequestFutures.entrySet()) {
+                    for (Map.Entry<ConfigResource, 
DescribeConfigsResponseData.DescribeConfigsResult> entry : 
response.resultMap().entrySet()) {
                         ConfigResource configResource = entry.getKey();
-                        KafkaFutureImpl<Config> future = entry.getValue();
-                        DescribeConfigsResponse.Config config = 
response.config(configResource);
-                        if (config == null) {
-                            future.completeExceptionally(new 
UnknownServerException(
-                                "Malformed broker response: missing config for 
" + configResource));
-                            continue;
-                        }
-                        if (config.error().isFailure()) {
-                            
future.completeExceptionally(config.error().exception());
-                            continue;
-                        }
-                        List<ConfigEntry> configEntries = new ArrayList<>();
-                        for (DescribeConfigsResponse.ConfigEntry configEntry : 
config.entries()) {
-                            configEntries.add(new 
ConfigEntry(configEntry.name(),
-                                    configEntry.value(), 
configSource(configEntry.source()),
-                                    configEntry.isSensitive(), 
configEntry.isReadOnly(),
-                                    configSynonyms(configEntry), 
configType(configEntry.type()),
-                                    configEntry.documentation()));
+                        DescribeConfigsResponseData.DescribeConfigsResult 
describeConfigsResult = entry.getValue();
+                        KafkaFutureImpl<Config> future = 
unified.get(configResource);
+                        if (future == null) {
+                            if (broker != null) {
+                                log.warn("The config {} in the response from 
broker {} is not in the request",
+                                        configResource, broker);
+                            } else {
+                                log.warn("The config {} in the response from 
the least loaded broker is not in the request",
+                                        configResource);
+                            }
+                        } else {
+                            if (describeConfigsResult.errorCode() != 
Errors.NONE.code()) {
+                                
future.completeExceptionally(Errors.forCode(describeConfigsResult.errorCode())
+                                        
.exception(describeConfigsResult.errorMessage()));
+                            } else {
+                                
future.complete(describeConfigResult(describeConfigsResult));
+                            }
                         }
-                        future.complete(new Config(configEntries));
                     }
+                    completeUnrealizedFutures(
+                        unified.entrySet().stream(),
+                        configResource -> "The broker response did not contain 
a result for config resource " + configResource);
                 }
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    completeAllExceptionally(unifiedRequestFutures.values(), 
throwable);
+                    completeAllExceptionally(unified.values(), throwable);
                 }
             }, now);
         }
 
-        for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : 
brokerFutures.entrySet()) {
-            final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
-            final ConfigResource resource = entry.getKey();
-            final int nodeId = Integer.parseInt(resource.name());
-            runnable.call(new Call("describeBrokerConfigs", 
calcDeadlineMs(now, options.timeoutMs()),
-                    new ConstantNodeIdProvider(nodeId)) {
-
-                @Override
-                DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-                    return new 
DescribeConfigsRequest.Builder(Collections.singleton(resource))
-                            .includeSynonyms(options.includeSynonyms())
-                            
.includeDocumentation(options.includeDocumentation());
-                }
-
-                @Override
-                void handleResponse(AbstractResponse abstractResponse) {
-                    DescribeConfigsResponse response = 
(DescribeConfigsResponse) abstractResponse;
-                    DescribeConfigsResponse.Config config = 
response.configs().get(resource);
-
-                    if (config == null) {
-                        brokerFuture.completeExceptionally(new 
UnknownServerException(
-                            "Malformed broker response: missing config for " + 
resource));
-                        return;
-                    }
-                    if (config.error().isFailure())
-                        
brokerFuture.completeExceptionally(config.error().exception());
-                    else {
-                        List<ConfigEntry> configEntries = new ArrayList<>();
-                        for (DescribeConfigsResponse.ConfigEntry configEntry : 
config.entries()) {
-                            configEntries.add(new 
ConfigEntry(configEntry.name(), configEntry.value(),
-                                configSource(configEntry.source()), 
configEntry.isSensitive(), configEntry.isReadOnly(),
-                                configSynonyms(configEntry), 
configType(configEntry.type()), configEntry.documentation()));

Review comment:
       It looks like `configType()` is not used anymore. Can we delete it?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2568,25 +2568,32 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
     val describeConfigsRequest = request.body[DescribeConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.resources.asScala.toBuffer.partition { resource =>
-      resource.`type` match {
+    val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.data.resources.asScala.toBuffer.partition { resource =>
+      ConfigResource.Type.forId(resource.resourceType) match {
         case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
           authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt for resource ${resource.name}")
+          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, 
resource.resourceName)
+        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt for resource ${resource.resourceName}")
       }
     }
-    val authorizedConfigs = 
adminManager.describeConfigs(authorizedResources.map { resource =>
-      resource -> 
Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
-    }.toMap, describeConfigsRequest.includeSynonyms, 
describeConfigsRequest.includeDocumentation)
+    val authorizedConfigs = 
adminManager.describeConfigs(authorizedResources.toList, 
describeConfigsRequest.data.includeSynonyms, 
describeConfigsRequest.data.includeDocumentation)
     val unauthorizedConfigs = unauthorizedResources.map { resource =>
-      val error = configsAuthorizationApiError(resource)
-      resource -> new DescribeConfigsResponse.Config(error, 
util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+      val error = ConfigResource.Type.forId(resource.resourceType) match {
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => 
Errors.CLUSTER_AUTHORIZATION_FAILED
+        case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
+        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt for resource ${resource.resourceName}")
+      }
+      new 
DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
+        .setErrorMessage(error.message)
+        
.setConfigs(util.Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])

Review comment:
       We can drop `util` here, we are importing `import 
java.util.{Collections, Optional}`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to