mimaison commented on a change in pull request #8312: URL: https://github.com/apache/kafka/pull/8312#discussion_r429211490
########## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ########## @@ -174,19 +97,25 @@ public boolean isReadOnly() { } public enum ConfigSource { - UNKNOWN_CONFIG((byte) 0), - TOPIC_CONFIG((byte) 1), - DYNAMIC_BROKER_CONFIG((byte) 2), - DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3), - STATIC_BROKER_CONFIG((byte) 4), - DEFAULT_CONFIG((byte) 5), - DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6); + UNKNOWN_CONFIG((byte) 0, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN), Review comment: It's unfortunate the public enum cannot do the mapping and we have to keep a copy here. Could we maybe rename this one so this looks less crazy? WDYT? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1912,53 +1915,51 @@ public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configRe // The non-BROKER resources which we want to describe. These resources can be described by a // single, unified DescribeConfigs request. - final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size()); + final DescribeConfigsRequestData unifiedRequestResources = new DescribeConfigsRequestData(); for (ConfigResource resource : configResources) { if (dependsOnSpecificNode(resource)) { brokerFutures.put(resource, new KafkaFutureImpl<>()); brokerResources.add(resource); } else { unifiedRequestFutures.put(resource, new KafkaFutureImpl<>()); - unifiedRequestResources.add(resource); + unifiedRequestResources.resources().add(new DescribeConfigsResource() + .setResourceName(resource.name()) + .setResourceType(resource.type().id())); } } final long now = time.milliseconds(); - if (!unifiedRequestResources.isEmpty()) { + if (!unifiedRequestResources.resources().isEmpty()) { runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override DescribeConfigsRequest.Builder createRequest(int timeoutMs) { - return new DescribeConfigsRequest.Builder(unifiedRequestResources) - .includeSynonyms(options.includeSynonyms()); + return new DescribeConfigsRequest.Builder(unifiedRequestResources.setIncludeSynoyms(options.includeSynonyms())); Review comment: I wonder if it would read better if `unifiedRequestResources` was instead a `List<DescribeConfigsResource>` and we were creating the `DescribeConfigsRequestData` object and calling all its setters here. WDYT? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ########## @@ -220,145 +153,66 @@ public ConfigSource source() { } } + public Map<ConfigResource, DescribeConfigsResponseData.DescribeConfigsResult> resultMap() { + return data().results().stream().collect(Collectors.toMap( + configsResult -> + new ConfigResource(ConfigResource.Type.forId(configsResult.resourceType()), + configsResult.resourceName()), + Function.identity())); + } - private final int throttleTimeMs; - private final Map<ConfigResource, Config> configs; + private final DescribeConfigsResponseData data; - public DescribeConfigsResponse(int throttleTimeMs, Map<ConfigResource, Config> configs) { - this.throttleTimeMs = throttleTimeMs; - this.configs = Objects.requireNonNull(configs, "configs"); + public DescribeConfigsResponse(DescribeConfigsResponseData data) { + this.data = data; } - public DescribeConfigsResponse(Struct struct) { - throttleTimeMs = struct.get(THROTTLE_TIME_MS); - Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); - configs = new HashMap<>(resourcesArray.length); - for (Object resourceObj : resourcesArray) { - Struct resourceStruct = (Struct) resourceObj; - - ApiError error = new ApiError(resourceStruct); - ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); - String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); - ConfigResource resource = new ConfigResource(resourceType, resourceName); - - Object[] configEntriesArray = resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME); - List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length); - for (Object configEntriesObj: configEntriesArray) { - Struct configEntriesStruct = (Struct) configEntriesObj; - String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME); - String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME); - boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME); - ConfigSource configSource; - if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME)) - configSource = ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME)); - else if (configEntriesStruct.hasField(IS_DEFAULT_KEY_NAME)) { - if (configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME)) - configSource = ConfigSource.DEFAULT_CONFIG; - else { - switch (resourceType) { - case BROKER: - configSource = ConfigSource.STATIC_BROKER_CONFIG; - break; - case TOPIC: - configSource = ConfigSource.TOPIC_CONFIG; - break; - default: - configSource = ConfigSource.UNKNOWN_CONFIG; - break; + public DescribeConfigsResponse(Struct struct, short version) { + this.data = new DescribeConfigsResponseData(struct, version); + if (version == 0) { + for (DescribeConfigsResult result : data.results()) { + for (DescribeConfigsResponseData.DescribeConfigsResourceResult x : result.configs()) { Review comment: Let's give that variable a better name, what about `config`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java ########## @@ -92,85 +48,43 @@ public Builder(Collection<ConfigResource> resources) { @Override public DescribeConfigsRequest build(short version) { - return new DescribeConfigsRequest(version, resourceToConfigNames, includeSynonyms); + return new DescribeConfigsRequest(data, version); } } - private final Map<ConfigResource, Collection<String>> resourceToConfigNames; - private final boolean includeSynonyms; + private final DescribeConfigsRequestData data; - public DescribeConfigsRequest(short version, Map<ConfigResource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) { + public DescribeConfigsRequest(DescribeConfigsRequestData data, short version) { super(ApiKeys.DESCRIBE_CONFIGS, version); - this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames"); - this.includeSynonyms = includeSynonyms; + this.data = data; } public DescribeConfigsRequest(Struct struct, short version) { super(ApiKeys.DESCRIBE_CONFIGS, version); - Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); - resourceToConfigNames = new HashMap<>(resourcesArray.length); - for (Object resourceObj : resourcesArray) { - Struct resourceStruct = (Struct) resourceObj; - ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); - String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); - - Object[] configNamesArray = resourceStruct.getArray(CONFIG_NAMES_KEY_NAME); - List<String> configNames = null; - if (configNamesArray != null) { - configNames = new ArrayList<>(configNamesArray.length); - for (Object configNameObj : configNamesArray) - configNames.add((String) configNameObj); - } - - resourceToConfigNames.put(new ConfigResource(resourceType, resourceName), configNames); - } - this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false; - } - - public Collection<ConfigResource> resources() { - return resourceToConfigNames.keySet(); + this.data = new DescribeConfigsRequestData(struct, version); } - /** - * Return null if all config names should be returned. - */ - public Collection<String> configNames(ConfigResource resource) { - return resourceToConfigNames.get(resource); - } - - public boolean includeSynonyms() { - return includeSynonyms; + public DescribeConfigsRequestData data() { + return data; } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version())); - List<Struct> resourceStructs = new ArrayList<>(resources().size()); - for (Map.Entry<ConfigResource, Collection<String>> entry : resourceToConfigNames.entrySet()) { - ConfigResource resource = entry.getKey(); - Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); - resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); - resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); - - String[] configNames = entry.getValue() == null ? null : entry.getValue().toArray(new String[0]); - resourceStruct.set(CONFIG_NAMES_KEY_NAME, configNames); - - resourceStructs.add(resourceStruct); - } - struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); - struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms); - return struct; + return data.toStruct(version()); } @Override public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, Throwable e) { - ApiError error = ApiError.fromThrowable(e); - Map<ConfigResource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size()); - DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error, - Collections.emptyList()); - for (ConfigResource resource : resources()) - errors.put(resource, config); - return new DescribeConfigsResponse(throttleTimeMs, errors); + return new DescribeConfigsResponse(new DescribeConfigsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setResults(data.resources().stream().map(r -> { + Errors error = Errors.forException(e); Review comment: Can we move that line to the top? ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -1845,34 +1857,59 @@ private DeleteAclsResponse createDeleteAclsResponse() { } private DescribeConfigsRequest createDescribeConfigsRequest(int version) { - return new DescribeConfigsRequest.Builder(asList( - new ConfigResource(ConfigResource.Type.BROKER, "0"), - new ConfigResource(ConfigResource.Type.TOPIC, "topic"))) - .build((short) version); + return new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData().setResources(asList( + new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.BROKER.id()) + .setResourceName("0"), + new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName("topic")))) + .build((short) version); } private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries(int version) { - Map<ConfigResource, Collection<String>> resources = new HashMap<>(); - resources.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), asList("foo", "bar")); - resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), null); - resources.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic a"), emptyList()); - return new DescribeConfigsRequest.Builder(resources).build((short) version); - } - - private DescribeConfigsResponse createDescribeConfigsResponse() { - Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>(); - List<DescribeConfigsResponse.ConfigSynonym> synonyms = emptyList(); - List<DescribeConfigsResponse.ConfigEntry> configEntries = asList( - new DescribeConfigsResponse.ConfigEntry("config_name", "config_value", - DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms), - new DescribeConfigsResponse.ConfigEntry("another_name", "another value", - DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms) + return new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData().setResources(asList( + new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.BROKER.id()) + .setResourceName("0") + .setConfigurationKeys(asList("foo", "bar")), + new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName("topic") + .setConfigurationKeys(null), + new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.TOPIC.id()) + .setResourceName("topic a") + .setConfigurationKeys(Collections.emptyList())))) + .build((short) version); + } + + private DescribeConfigsResponse createDescribeConfigsResponse(int version) { + List<DescribeConfigsSynonym> synonyms = Collections.emptyList(); + List<DescribeConfigsResourceResult> configEntries = asList( + new DescribeConfigsResourceResult().setName("config_name").setValue("config_value") + .setConfigSource(version == 0 ? -1 : DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG.id) + .setIsDefault(false) + .setIsSensitive(true).setReadOnly(false).setSynonyms(synonyms), + new DescribeConfigsResourceResult().setName("another_name").setValue("another_value") + .setIsDefault(false) + .setConfigSource(version == 0 ? -1 : DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG.id) + .setIsSensitive(false).setReadOnly(true).setSynonyms(synonyms) ); - configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new DescribeConfigsResponse.Config( - ApiError.NONE, configEntries)); - configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"), new DescribeConfigsResponse.Config( - ApiError.NONE, Collections.<DescribeConfigsResponse.ConfigEntry>emptyList())); - return new DescribeConfigsResponse(200, configs); + List<DescribeConfigsResult> configs = asList( + new DescribeConfigsResult() + .setResourceType(ConfigResource.Type.BROKER.id()) + .setResourceName("0") + .setErrorCode(ApiError.NONE.error().code()) Review comment: We can use `Errors.NONE.code()` directly. Same below ########## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java ########## @@ -17,70 +17,26 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.DescribeConfigsRequestData; +import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Objects; - -import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; -import static org.apache.kafka.common.protocol.types.Type.INT8; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import java.util.stream.Collectors; public class DescribeConfigsRequest extends AbstractRequest { - private static final String RESOURCES_KEY_NAME = "resources"; - private static final String INCLUDE_SYNONYMS = "include_synonyms"; - private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; - private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; - private static final String CONFIG_NAMES_KEY_NAME = "config_names"; - - private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), - new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING))); - - private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema( - new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned.")); - - private static final Schema DESCRIBE_CONFIGS_REQUEST_V1 = new Schema( - new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."), - new Field(INCLUDE_SYNONYMS, BOOLEAN)); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema DESCRIBE_CONFIGS_REQUEST_V2 = DESCRIBE_CONFIGS_REQUEST_V1; - - public static Schema[] schemaVersions() { - return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1, DESCRIBE_CONFIGS_REQUEST_V2}; - } - public static class Builder extends AbstractRequest.Builder<DescribeConfigsRequest> { - private final Map<ConfigResource, Collection<String>> resourceToConfigNames; - private boolean includeSynonyms; + private final DescribeConfigsRequestData data; - public Builder(Map<ConfigResource, Collection<String>> resourceToConfigNames) { + public Builder(DescribeConfigsRequestData data) { super(ApiKeys.DESCRIBE_CONFIGS); - this.resourceToConfigNames = Objects.requireNonNull(resourceToConfigNames, "resourceToConfigNames"); - } - - public Builder includeSynonyms(boolean includeSynonyms) { - this.includeSynonyms = includeSynonyms; - return this; - } - - public Builder(Collection<ConfigResource> resources) { - this(toResourceToConfigNames(resources)); + this.data = data; Review comment: As we remove the other constructors, `toResourceToConfigNames` is not called anymore. Let's delete it ########## File path: core/src/main/scala/kafka/server/AdminManager.scala ########## @@ -347,59 +346,62 @@ class AdminManager(val config: KafkaConfig, } } - def describeConfigs(resourceToConfigNames: Map[ConfigResource, Option[Set[String]]], includeSynonyms: Boolean): Map[ConfigResource, DescribeConfigsResponse.Config] = { - resourceToConfigNames.map { case (resource, configNames) => + def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource], includeSynonyms: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = { + resourceToConfigNames.map { case (resource) => def allConfigs(config: AbstractConfig) = { config.originals.asScala.filter(_._2 != null) ++ config.values.asScala } def createResponseConfig(configs: Map[String, Any], - createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = { + createConfigEntry: (String, Any) => DescribeConfigsResponseData.DescribeConfigsResourceResult): DescribeConfigsResponseData.DescribeConfigsResult = { val filteredConfigPairs = configs.filter { case (configName, _) => /* Always returns true if configNames is None */ - configNames.forall(_.contains(configName)) + resource.configurationKeys.asScala.forall(_.contains(configName)) }.toBuffer val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) } - new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava) + new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(ApiError.NONE.error().code()) Review comment: We can use `Errors.NONE.code` directly ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2017,6 +2016,18 @@ void handleFailure(Throwable throwable) { return new DescribeConfigsResult(allFutures); } + private Config describeConfigResult(DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult) { Review comment: It looks like we are not calling `configSynonyms()` anymore, so we can remove that private method ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2561,25 +2561,32 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = { val describeConfigsRequest = request.body[DescribeConfigsRequest] - val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.toBuffer.partition { resource => - resource.`type` match { + val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.toBuffer.partition { resource => + ConfigResource.Type.forId(resource.resourceType) match { case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME) case ConfigResource.Type.TOPIC => - authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name) - case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") + authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName) + case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}") } } - val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource => - resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet) - }.toMap, describeConfigsRequest.includeSynonyms) + val authorizedConfigs = adminManager.describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynoyms) val unauthorizedConfigs = unauthorizedResources.map { resource => - val error = configsAuthorizationApiError(resource) - resource -> new DescribeConfigsResponse.Config(error, util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) + val error = ConfigResource.Type.forId(resource.resourceType()) match { Review comment: We can remove the brackets here. Same a few lines below after `resourceName` ########## File path: core/src/main/scala/kafka/server/AdminManager.scala ########## @@ -707,11 +718,12 @@ class AdminManager(val config: KafkaConfig, case _ => ConfigDef.convertToString(value, configEntryType.orNull) } val allSynonyms = configSynonyms(name, allNames, isSensitive) - .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG) + .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG.id()) Review comment: No need for the brackets after `id` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1912,53 +1915,51 @@ public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configRe // The non-BROKER resources which we want to describe. These resources can be described by a // single, unified DescribeConfigs request. - final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size()); + final DescribeConfigsRequestData unifiedRequestResources = new DescribeConfigsRequestData(); for (ConfigResource resource : configResources) { if (dependsOnSpecificNode(resource)) { brokerFutures.put(resource, new KafkaFutureImpl<>()); brokerResources.add(resource); } else { unifiedRequestFutures.put(resource, new KafkaFutureImpl<>()); - unifiedRequestResources.add(resource); + unifiedRequestResources.resources().add(new DescribeConfigsResource() + .setResourceName(resource.name()) + .setResourceType(resource.type().id())); } } final long now = time.milliseconds(); - if (!unifiedRequestResources.isEmpty()) { + if (!unifiedRequestResources.resources().isEmpty()) { runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override DescribeConfigsRequest.Builder createRequest(int timeoutMs) { - return new DescribeConfigsRequest.Builder(unifiedRequestResources) - .includeSynonyms(options.includeSynonyms()); + return new DescribeConfigsRequest.Builder(unifiedRequestResources.setIncludeSynoyms(options.includeSynonyms())); } @Override void handleResponse(AbstractResponse abstractResponse) { DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse; + Map<ConfigResource, DescribeConfigsResponseData.DescribeConfigsResult> configResponseMap = response.resultMap(); for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : unifiedRequestFutures.entrySet()) { ConfigResource configResource = entry.getKey(); KafkaFutureImpl<Config> future = entry.getValue(); - DescribeConfigsResponse.Config config = response.config(configResource); - if (config == null) { - future.completeExceptionally(new UnknownServerException( - "Malformed broker response: missing config for " + configResource)); - continue; - } - if (config.error().isFailure()) { - future.completeExceptionally(config.error().exception()); - continue; - } - List<ConfigEntry> configEntries = new ArrayList<>(); - for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) { - configEntries.add(new ConfigEntry(configEntry.name(), - configEntry.value(), configSource(configEntry.source()), - configEntry.isSensitive(), configEntry.isReadOnly(), - configSynonyms(configEntry))); + + for (DescribeConfigsResponseData.DescribeConfigsResult resultData : response.data().results()) { Review comment: We are not using `resultData` in this loop. Is that expected? ########## File path: clients/src/main/resources/common/message/DescribeConfigsRequest.json ########## @@ -17,7 +17,7 @@ "apiKey": 32, "type": "request", "name": "DescribeConfigsRequest", - // Version 1 adds IncludeSynoyms. + // Version 1 adds IncludeSynonyms. Review comment: Let's also fix the field name ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org