mimaison commented on a change in pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#discussion_r429211490



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
##########
@@ -174,19 +97,25 @@ public boolean isReadOnly() {
     }
 
     public enum ConfigSource {
-        UNKNOWN_CONFIG((byte) 0),
-        TOPIC_CONFIG((byte) 1),
-        DYNAMIC_BROKER_CONFIG((byte) 2),
-        DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
-        STATIC_BROKER_CONFIG((byte) 4),
-        DEFAULT_CONFIG((byte) 5),
-        DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6);
+        UNKNOWN_CONFIG((byte) 0, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN),

Review comment:
       It's unfortunate the public enum cannot do the mapping and we have to 
keep a copy here. Could we maybe rename this one so this looks less crazy? WDYT?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1912,53 +1915,51 @@ public DescribeConfigsResult 
describeConfigs(Collection<ConfigResource> configRe
 
         // The non-BROKER resources which we want to describe.  These 
resources can be described by a
         // single, unified DescribeConfigs request.
-        final Collection<ConfigResource> unifiedRequestResources = new 
ArrayList<>(configResources.size());
+        final DescribeConfigsRequestData unifiedRequestResources = new 
DescribeConfigsRequestData();
 
         for (ConfigResource resource : configResources) {
             if (dependsOnSpecificNode(resource)) {
                 brokerFutures.put(resource, new KafkaFutureImpl<>());
                 brokerResources.add(resource);
             } else {
                 unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-                unifiedRequestResources.add(resource);
+                unifiedRequestResources.resources().add(new 
DescribeConfigsResource()
+                        .setResourceName(resource.name())
+                        .setResourceType(resource.type().id()));
             }
         }
 
         final long now = time.milliseconds();
-        if (!unifiedRequestResources.isEmpty()) {
+        if (!unifiedRequestResources.resources().isEmpty()) {
             runnable.call(new Call("describeConfigs", calcDeadlineMs(now, 
options.timeoutMs()),
                 new LeastLoadedNodeProvider()) {
 
                 @Override
                 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-                    return new 
DescribeConfigsRequest.Builder(unifiedRequestResources)
-                            .includeSynonyms(options.includeSynonyms());
+                    return new 
DescribeConfigsRequest.Builder(unifiedRequestResources.setIncludeSynoyms(options.includeSynonyms()));

Review comment:
       I wonder if it would read better if `unifiedRequestResources` was 
instead a `List<DescribeConfigsResource>` and we were creating the 
`DescribeConfigsRequestData` object and calling all its setters here. WDYT?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
##########
@@ -220,145 +153,66 @@ public ConfigSource source() {
         }
     }
 
+    public Map<ConfigResource, 
DescribeConfigsResponseData.DescribeConfigsResult> resultMap() {
+        return data().results().stream().collect(Collectors.toMap(
+            configsResult ->
+                    new 
ConfigResource(ConfigResource.Type.forId(configsResult.resourceType()),
+                            configsResult.resourceName()),
+            Function.identity()));
+    }
 
-    private final int throttleTimeMs;
-    private final Map<ConfigResource, Config> configs;
+    private final DescribeConfigsResponseData data;
 
-    public DescribeConfigsResponse(int throttleTimeMs, Map<ConfigResource, 
Config> configs) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.configs = Objects.requireNonNull(configs, "configs");
+    public DescribeConfigsResponse(DescribeConfigsResponseData data) {
+        this.data = data;
     }
 
-    public DescribeConfigsResponse(Struct struct) {
-        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
-        configs = new HashMap<>(resourcesArray.length);
-        for (Object resourceObj : resourcesArray) {
-            Struct resourceStruct = (Struct) resourceObj;
-
-            ApiError error = new ApiError(resourceStruct);
-            ConfigResource.Type resourceType = 
ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
-            String resourceName = 
resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
-            ConfigResource resource = new ConfigResource(resourceType, 
resourceName);
-
-            Object[] configEntriesArray = 
resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
-            List<ConfigEntry> configEntries = new 
ArrayList<>(configEntriesArray.length);
-            for (Object configEntriesObj: configEntriesArray) {
-                Struct configEntriesStruct = (Struct) configEntriesObj;
-                String configName = 
configEntriesStruct.getString(CONFIG_NAME_KEY_NAME);
-                String configValue = 
configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME);
-                boolean isSensitive = 
configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME);
-                ConfigSource configSource;
-                if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME))
-                    configSource = 
ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME));
-                else if (configEntriesStruct.hasField(IS_DEFAULT_KEY_NAME)) {
-                    if (configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME))
-                        configSource = ConfigSource.DEFAULT_CONFIG;
-                    else {
-                        switch (resourceType) {
-                            case BROKER:
-                                configSource = 
ConfigSource.STATIC_BROKER_CONFIG;
-                                break;
-                            case TOPIC:
-                                configSource = ConfigSource.TOPIC_CONFIG;
-                                break;
-                            default:
-                                configSource = ConfigSource.UNKNOWN_CONFIG;
-                                break;
+    public DescribeConfigsResponse(Struct struct, short version) {
+        this.data = new DescribeConfigsResponseData(struct, version);
+        if (version == 0) {
+            for (DescribeConfigsResult result : data.results()) {
+                for (DescribeConfigsResponseData.DescribeConfigsResourceResult 
x : result.configs()) {

Review comment:
       Let's give that variable a better name, what about `config`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
##########
@@ -92,85 +48,43 @@ public Builder(Collection<ConfigResource> resources) {
 
         @Override
         public DescribeConfigsRequest build(short version) {
-            return new DescribeConfigsRequest(version, resourceToConfigNames, 
includeSynonyms);
+            return new DescribeConfigsRequest(data, version);
         }
     }
 
-    private final Map<ConfigResource, Collection<String>> 
resourceToConfigNames;
-    private final boolean includeSynonyms;
+    private final DescribeConfigsRequestData data;
 
-    public DescribeConfigsRequest(short version, Map<ConfigResource, 
Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
+    public DescribeConfigsRequest(DescribeConfigsRequestData data, short 
version) {
         super(ApiKeys.DESCRIBE_CONFIGS, version);
-        this.resourceToConfigNames = 
Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames");
-        this.includeSynonyms = includeSynonyms;
+        this.data = data;
     }
 
     public DescribeConfigsRequest(Struct struct, short version) {
         super(ApiKeys.DESCRIBE_CONFIGS, version);
-        Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
-        resourceToConfigNames = new HashMap<>(resourcesArray.length);
-        for (Object resourceObj : resourcesArray) {
-            Struct resourceStruct = (Struct) resourceObj;
-            ConfigResource.Type resourceType = 
ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
-            String resourceName = 
resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
-
-            Object[] configNamesArray = 
resourceStruct.getArray(CONFIG_NAMES_KEY_NAME);
-            List<String> configNames = null;
-            if (configNamesArray != null) {
-                configNames = new ArrayList<>(configNamesArray.length);
-                for (Object configNameObj : configNamesArray)
-                    configNames.add((String) configNameObj);
-            }
-
-            resourceToConfigNames.put(new ConfigResource(resourceType, 
resourceName), configNames);
-        }
-        this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? 
struct.getBoolean(INCLUDE_SYNONYMS) : false;
-    }
-
-    public Collection<ConfigResource> resources() {
-        return resourceToConfigNames.keySet();
+        this.data = new DescribeConfigsRequestData(struct, version);
     }
 
-    /**
-     * Return null if all config names should be returned.
-     */
-    public Collection<String> configNames(ConfigResource resource) {
-        return resourceToConfigNames.get(resource);
-    }
-
-    public boolean includeSynonyms() {
-        return includeSynonyms;
+    public DescribeConfigsRequestData data() {
+        return data;
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
-        List<Struct> resourceStructs = new ArrayList<>(resources().size());
-        for (Map.Entry<ConfigResource, Collection<String>> entry : 
resourceToConfigNames.entrySet()) {
-            ConfigResource resource = entry.getKey();
-            Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
-            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
-            resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
-
-            String[] configNames = entry.getValue() == null ? null : 
entry.getValue().toArray(new String[0]);
-            resourceStruct.set(CONFIG_NAMES_KEY_NAME, configNames);
-
-            resourceStructs.add(resourceStruct);
-        }
-        struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
-        struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms);
-        return struct;
+        return data.toStruct(version());
     }
 
     @Override
     public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-        ApiError error = ApiError.fromThrowable(e);
-        Map<ConfigResource, DescribeConfigsResponse.Config> errors = new 
HashMap<>(resources().size());
-        DescribeConfigsResponse.Config config = new 
DescribeConfigsResponse.Config(error,
-                Collections.emptyList());
-        for (ConfigResource resource : resources())
-            errors.put(resource, config);
-        return new DescribeConfigsResponse(throttleTimeMs, errors);
+        return new DescribeConfigsResponse(new DescribeConfigsResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setResults(data.resources().stream().map(r -> {
+                    Errors error = Errors.forException(e);

Review comment:
       Can we move that line to the top?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1845,34 +1857,59 @@ private DeleteAclsResponse createDeleteAclsResponse() {
     }
 
     private DescribeConfigsRequest createDescribeConfigsRequest(int version) {
-        return new DescribeConfigsRequest.Builder(asList(
-                new ConfigResource(ConfigResource.Type.BROKER, "0"),
-                new ConfigResource(ConfigResource.Type.TOPIC, "topic")))
-                .build((short) version);
+        return new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData().setResources(asList(
+                new DescribeConfigsRequestData.DescribeConfigsResource()
+                        .setResourceType(ConfigResource.Type.BROKER.id())
+                        .setResourceName("0"),
+                new DescribeConfigsRequestData.DescribeConfigsResource()
+                        .setResourceType(ConfigResource.Type.TOPIC.id())
+                        .setResourceName("topic"))))
+            .build((short) version);
     }
 
     private DescribeConfigsRequest 
createDescribeConfigsRequestWithConfigEntries(int version) {
-        Map<ConfigResource, Collection<String>> resources = new HashMap<>();
-        resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), 
asList("foo", "bar"));
-        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), 
null);
-        resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic 
a"), emptyList());
-        return new DescribeConfigsRequest.Builder(resources).build((short) 
version);
-    }
-
-    private DescribeConfigsResponse createDescribeConfigsResponse() {
-        Map<ConfigResource, DescribeConfigsResponse.Config> configs = new 
HashMap<>();
-        List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList();
-        List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
-                new DescribeConfigsResponse.ConfigEntry("config_name", 
"config_value",
-                        
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, 
synonyms),
-                new DescribeConfigsResponse.ConfigEntry("another_name", 
"another value",
-                        DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, 
false, true, synonyms)
+        return new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData().setResources(asList(
+                new DescribeConfigsRequestData.DescribeConfigsResource()
+                        .setResourceType(ConfigResource.Type.BROKER.id())
+                        .setResourceName("0")
+                        .setConfigurationKeys(asList("foo", "bar")),
+                new DescribeConfigsRequestData.DescribeConfigsResource()
+                        .setResourceType(ConfigResource.Type.TOPIC.id())
+                        .setResourceName("topic")
+                        .setConfigurationKeys(null),
+                new DescribeConfigsRequestData.DescribeConfigsResource()
+                        .setResourceType(ConfigResource.Type.TOPIC.id())
+                        .setResourceName("topic a")
+                        .setConfigurationKeys(Collections.emptyList()))))
+                .build((short) version);
+    }
+
+    private DescribeConfigsResponse createDescribeConfigsResponse(int version) 
{
+        List<DescribeConfigsSynonym> synonyms = Collections.emptyList();
+        List<DescribeConfigsResourceResult> configEntries = asList(
+                new 
DescribeConfigsResourceResult().setName("config_name").setValue("config_value")
+                .setConfigSource(version == 0 ? -1 : 
DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG.id)
+                        .setIsDefault(false)
+                        
.setIsSensitive(true).setReadOnly(false).setSynonyms(synonyms),
+                new 
DescribeConfigsResourceResult().setName("another_name").setValue("another_value")
+                        .setIsDefault(false)
+                        .setConfigSource(version == 0 ? -1 : 
DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id)
+                        
.setIsSensitive(false).setReadOnly(true).setSynonyms(synonyms)
         );
-        configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new 
DescribeConfigsResponse.Config(
-                ApiError.NONE, configEntries));
-        configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), 
new DescribeConfigsResponse.Config(
-                ApiError.NONE, 
Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
-        return new DescribeConfigsResponse(200, configs);
+        List<DescribeConfigsResult> configs = asList(
+                new DescribeConfigsResult()
+                        .setResourceType(ConfigResource.Type.BROKER.id())
+                        .setResourceName("0")
+                        .setErrorCode(ApiError.NONE.error().code())

Review comment:
       We can use `Errors.NONE.code()` directly. Same below

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
##########
@@ -17,70 +17,26 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.DescribeConfigsRequestData;
+import org.apache.kafka.common.message.DescribeConfigsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
-import static org.apache.kafka.common.protocol.types.Type.INT8;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import java.util.stream.Collectors;
 
 public class DescribeConfigsRequest extends AbstractRequest {
 
-    private static final String RESOURCES_KEY_NAME = "resources";
-    private static final String INCLUDE_SYNONYMS = "include_synonyms";
-    private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
-    private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
-    private static final String CONFIG_NAMES_KEY_NAME = "config_names";
-
-    private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new 
Schema(
-            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
-            new Field(RESOURCE_NAME_KEY_NAME, STRING),
-            new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING)));
-
-    private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
-            new Field(RESOURCES_KEY_NAME, new 
ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to 
be returned."));
-
-    private static final Schema DESCRIBE_CONFIGS_REQUEST_V1 = new Schema(
-            new Field(RESOURCES_KEY_NAME, new 
ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to 
be returned."),
-            new Field(INCLUDE_SYNONYMS, BOOLEAN));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema DESCRIBE_CONFIGS_REQUEST_V2 = 
DESCRIBE_CONFIGS_REQUEST_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, 
DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2};
-    }
-
     public static class Builder extends 
AbstractRequest.Builder<DescribeConfigsRequest> {
-        private final Map<ConfigResource, Collection<String>> 
resourceToConfigNames;
-        private boolean includeSynonyms;
+        private final DescribeConfigsRequestData data;
 
-        public Builder(Map<ConfigResource, Collection<String>> 
resourceToConfigNames) {
+        public Builder(DescribeConfigsRequestData data) {
             super(ApiKeys.DESCRIBE_CONFIGS);
-            this.resourceToConfigNames = 
Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames");
-        }
-
-        public Builder includeSynonyms(boolean includeSynonyms) {
-            this.includeSynonyms = includeSynonyms;
-            return this;
-        }
-
-        public Builder(Collection<ConfigResource> resources) {
-            this(toResourceToConfigNames(resources));
+            this.data = data;

Review comment:
       As we remove the other constructors, `toResourceToConfigNames` is not 
called anymore. Let's delete it

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -347,59 +346,62 @@ class AdminManager(val config: KafkaConfig,
     }
   }
 
-  def describeConfigs(resourceToConfigNames: Map[ConfigResource, 
Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, 
DescribeConfigsResponse.Config] = {
-    resourceToConfigNames.map { case (resource, configNames) =>
+  def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource], 
includeSynonyms: Boolean): 
List[DescribeConfigsResponseData.DescribeConfigsResult] = {
+    resourceToConfigNames.map { case (resource) =>
 
       def allConfigs(config: AbstractConfig) = {
         config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
       }
       def createResponseConfig(configs: Map[String, Any],
-                               createConfigEntry: (String, Any) => 
DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
+                               createConfigEntry: (String, Any) => 
DescribeConfigsResponseData.DescribeConfigsResourceResult): 
DescribeConfigsResponseData.DescribeConfigsResult = {
         val filteredConfigPairs = configs.filter { case (configName, _) =>
           /* Always returns true if configNames is None */
-          configNames.forall(_.contains(configName))
+          resource.configurationKeys.asScala.forall(_.contains(configName))
         }.toBuffer
 
         val configEntries = filteredConfigPairs.map { case (name, value) => 
createConfigEntry(name, value) }
-        new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
+        new 
DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(ApiError.NONE.error().code())

Review comment:
       We can use `Errors.NONE.code` directly

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2017,6 +2016,18 @@ void handleFailure(Throwable throwable) {
         return new DescribeConfigsResult(allFutures);
     }
 
+    private Config 
describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult 
describeConfigsResult) {

Review comment:
       It looks like we are not calling `configSynonyms()` anymore, so we can 
remove that private method

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2561,25 +2561,32 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
     val describeConfigsRequest = request.body[DescribeConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.resources.asScala.toBuffer.partition { resource =>
-      resource.`type` match {
+    val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.data.resources.asScala.toBuffer.partition { resource =>
+      ConfigResource.Type.forId(resource.resourceType) match {
         case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
           authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt for resource ${resource.name}")
+          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, 
resource.resourceName)
+        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt for resource ${resource.resourceName}")
       }
     }
-    val authorizedConfigs = 
adminManager.describeConfigs(authorizedResources.map { resource =>
-      resource -> 
Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
-    }.toMap, describeConfigsRequest.includeSynonyms)
+    val authorizedConfigs = 
adminManager.describeConfigs(authorizedResources.toList, 
describeConfigsRequest.data.includeSynoyms)
     val unauthorizedConfigs = unauthorizedResources.map { resource =>
-      val error = configsAuthorizationApiError(resource)
-      resource -> new DescribeConfigsResponse.Config(error, 
util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+      val error = ConfigResource.Type.forId(resource.resourceType()) match {

Review comment:
       We can remove the brackets here. Same a few lines below after 
`resourceName`

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -707,11 +718,12 @@ class AdminManager(val config: KafkaConfig,
       case _ => ConfigDef.convertToString(value, configEntryType.orNull)
     }
     val allSynonyms = configSynonyms(name, allNames, isSensitive)
-        .filter(perBrokerConfig || _.source == 
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)
+        .filter(perBrokerConfig || _.source == 
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id())

Review comment:
       No need for the brackets after `id`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1912,53 +1915,51 @@ public DescribeConfigsResult 
describeConfigs(Collection<ConfigResource> configRe
 
         // The non-BROKER resources which we want to describe.  These 
resources can be described by a
         // single, unified DescribeConfigs request.
-        final Collection<ConfigResource> unifiedRequestResources = new 
ArrayList<>(configResources.size());
+        final DescribeConfigsRequestData unifiedRequestResources = new 
DescribeConfigsRequestData();
 
         for (ConfigResource resource : configResources) {
             if (dependsOnSpecificNode(resource)) {
                 brokerFutures.put(resource, new KafkaFutureImpl<>());
                 brokerResources.add(resource);
             } else {
                 unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-                unifiedRequestResources.add(resource);
+                unifiedRequestResources.resources().add(new 
DescribeConfigsResource()
+                        .setResourceName(resource.name())
+                        .setResourceType(resource.type().id()));
             }
         }
 
         final long now = time.milliseconds();
-        if (!unifiedRequestResources.isEmpty()) {
+        if (!unifiedRequestResources.resources().isEmpty()) {
             runnable.call(new Call("describeConfigs", calcDeadlineMs(now, 
options.timeoutMs()),
                 new LeastLoadedNodeProvider()) {
 
                 @Override
                 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-                    return new 
DescribeConfigsRequest.Builder(unifiedRequestResources)
-                            .includeSynonyms(options.includeSynonyms());
+                    return new 
DescribeConfigsRequest.Builder(unifiedRequestResources.setIncludeSynoyms(options.includeSynonyms()));
                 }
 
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
                     DescribeConfigsResponse response = 
(DescribeConfigsResponse) abstractResponse;
+                    Map<ConfigResource, 
DescribeConfigsResponseData.DescribeConfigsResult> configResponseMap = 
response.resultMap();
                     for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> 
entry : unifiedRequestFutures.entrySet()) {
                         ConfigResource configResource = entry.getKey();
                         KafkaFutureImpl<Config> future = entry.getValue();
-                        DescribeConfigsResponse.Config config = 
response.config(configResource);
-                        if (config == null) {
-                            future.completeExceptionally(new 
UnknownServerException(
-                                "Malformed broker response: missing config for 
" + configResource));
-                            continue;
-                        }
-                        if (config.error().isFailure()) {
-                            
future.completeExceptionally(config.error().exception());
-                            continue;
-                        }
-                        List<ConfigEntry> configEntries = new ArrayList<>();
-                        for (DescribeConfigsResponse.ConfigEntry configEntry : 
config.entries()) {
-                            configEntries.add(new 
ConfigEntry(configEntry.name(),
-                                    configEntry.value(), 
configSource(configEntry.source()),
-                                    configEntry.isSensitive(), 
configEntry.isReadOnly(),
-                                    configSynonyms(configEntry)));
+
+                        for (DescribeConfigsResponseData.DescribeConfigsResult 
resultData : response.data().results()) {

Review comment:
       We are not using `resultData` in this loop. Is that expected?

##########
File path: clients/src/main/resources/common/message/DescribeConfigsRequest.json
##########
@@ -17,7 +17,7 @@
   "apiKey": 32,
   "type": "request",
   "name": "DescribeConfigsRequest",
-  // Version 1 adds IncludeSynoyms.
+  // Version 1 adds IncludeSynonyms.

Review comment:
       Let's also fix the field name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to