This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 42041f47729 MINOR: Refactor createResponseConfig to avoid collection
copy and conversion (#19867)
42041f47729 is described below
commit 42041f47729947dc9da0302cb3b747427f5cef97
Author: Yunchi Pang <[email protected]>
AuthorDate: Wed Jul 2 06:32:11 2025 -0700
MINOR: Refactor createResponseConfig to avoid collection copy and
conversion (#19867)
issue: https://github.com/apache/kafka/pull/19687/files#r2094574178
Why:
- To improve performance by avoiding redundant temporary collections and
repeated method calls.
- To make the utility more flexible for inputs from both Java and Scala.
What:
- Refactored `createResponseConfig` in `ConfigHelper.scala` by
overloading the method to accept both Java maps and `AbstractConfig`.
- Extracted helper functions to `ConfigHelperUtils` in the server
module.
Reviewers: Ken Huang <[email protected]>, Jhen-Yung Hsu
<[email protected]>, TengYao Chi <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
.../src/main/scala/kafka/server/ConfigHelper.scala | 42 +++-------
.../org/apache/kafka/server/ConfigHelperUtils.java | 90 ++++++++++++++++++++++
2 files changed, 102 insertions(+), 30 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 9a992a55c74..743937b54fc 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -22,7 +22,7 @@ import kafka.network.RequestChannel
import java.util.{Collections, Properties}
import kafka.utils.Logging
import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigResource}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
@@ -34,6 +34,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
+import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.logger.LoggingController
import org.apache.kafka.server.metrics.ClientMetricsConfigs
@@ -45,10 +46,6 @@ import scala.jdk.OptionConverters.RichOptional
class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig,
configRepository: ConfigRepository) extends Logging {
- def allConfigs(config: AbstractConfig): mutable.Map[String, Any] = {
- config.originals.asScala.filter(_._2 != null) ++
config.nonInternalValues.asScala
- }
-
def handleDescribeConfigsRequest(
request: RequestChannel.Request,
authHelper: AuthHelper
@@ -86,21 +83,6 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
includeSynonyms: Boolean,
includeDocumentation: Boolean):
List[DescribeConfigsResponseData.DescribeConfigsResult] = {
resourceToConfigNames.map { resource =>
-
- def createResponseConfig(configs: Map[String, Any],
- createConfigEntry: (String, Any) =>
DescribeConfigsResponseData.DescribeConfigsResourceResult):
DescribeConfigsResponseData.DescribeConfigsResult = {
- val filteredConfigPairs = if (resource.configurationKeys == null ||
resource.configurationKeys.isEmpty)
- configs.toBuffer
- else
- configs.filter { case (configName, _) =>
- resource.configurationKeys.asScala.contains(configName)
- }.toBuffer
-
- val configEntries = filteredConfigPairs.map { case (name, value) =>
createConfigEntry(name, value) }
- new
DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
- .setConfigs(configEntries.asJava)
- }
-
try {
val configResult = ConfigResource.Type.forId(resource.resourceType)
match {
case ConfigResource.Type.TOPIC =>
@@ -109,7 +91,7 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
if (metadataCache.contains(topic)) {
val topicProps = configRepository.topicConfig(topic)
val logConfig = LogConfig.fromProps(config.extractLogConfigMap,
topicProps)
- createResponseConfig(allConfigs(logConfig),
createTopicConfigEntry(logConfig, topicProps, includeSynonyms,
includeDocumentation))
+ createResponseConfig(resource, logConfig,
createTopicConfigEntry(logConfig, topicProps, includeSynonyms,
includeDocumentation)(_, _))
} else {
new
DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
@@ -117,11 +99,11 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
case ConfigResource.Type.BROKER =>
if (resource.resourceName == null || resource.resourceName.isEmpty)
-
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
- createBrokerConfigEntry(perBrokerConfig = false,
includeSynonyms, includeDocumentation))
+ createResponseConfig(resource,
config.dynamicConfig.currentDynamicDefaultConfigs.asJava,
+ createBrokerConfigEntry(perBrokerConfig = false,
includeSynonyms, includeDocumentation)(_, _))
else if (resourceNameToBrokerId(resource.resourceName) ==
config.brokerId)
- createResponseConfig(allConfigs(config),
- createBrokerConfigEntry(perBrokerConfig = true,
includeSynonyms, includeDocumentation))
+ createResponseConfig(resource, config,
+ createBrokerConfigEntry(perBrokerConfig = true,
includeSynonyms, includeDocumentation)(_, _))
else
throw new InvalidRequestException(s"Unexpected broker id,
expected ${config.brokerId} or empty string, but received
${resource.resourceName}")
@@ -131,8 +113,8 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
else if (resourceNameToBrokerId(resource.resourceName) !=
config.brokerId)
throw new InvalidRequestException(s"Unexpected broker id,
expected ${config.brokerId} but received ${resource.resourceName}")
else
- createResponseConfig(LoggingController.loggers.asScala,
- (name, value) => new
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
+ createResponseConfig(resource, LoggingController.loggers,
+ (name: String, value: Object) => new
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
@@ -142,7 +124,7 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
} else {
val clientMetricsProps = configRepository.config(new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
val clientMetricsConfig =
ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(),
clientMetricsProps)
- createResponseConfig(allConfigs(clientMetricsConfig),
createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps,
includeSynonyms, includeDocumentation))
+ createResponseConfig(resource, clientMetricsConfig,
createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps,
includeSynonyms, includeDocumentation)(_, _))
}
case ConfigResource.Type.GROUP =>
@@ -152,7 +134,7 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
} else {
val groupProps = configRepository.groupConfig(group)
val groupConfig =
GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
groupProps)
- createResponseConfig(allConfigs(groupConfig),
createGroupConfigEntry(groupConfig, groupProps, includeSynonyms,
includeDocumentation))
+ createResponseConfig(resource, groupConfig,
createGroupConfigEntry(groupConfig, groupProps, includeSynonyms,
includeDocumentation)(_, _))
}
case resourceType => throw new InvalidRequestException(s"Unsupported
resource type: $resourceType")
@@ -322,4 +304,4 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
throw new InvalidRequestException(s"Broker id must be an integer, but
it is: $resourceName")
}
}
-}
+}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java
b/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java
new file mode 100644
index 00000000000..b104de4c156
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.message.DescribeConfigsRequestData;
+import org.apache.kafka.common.message.DescribeConfigsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+
+public class ConfigHelperUtils {
+
+ /**
+ * Creates a DescribeConfigsResult from a Map of configs.
+ */
+ public static <V> DescribeConfigsResponseData.DescribeConfigsResult
createResponseConfig(
+ DescribeConfigsRequestData.DescribeConfigsResource resource,
+ Map<String, V> config,
+ BiFunction<String, Object,
DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
+
+ return toDescribeConfigsResult(
+ config.entrySet().stream()
+ .map(entry -> Map.entry(entry.getKey(),
entry.getValue())),
+ resource,
+ createConfigEntry
+ );
+ }
+
+ /**
+ * Creates a DescribeConfigsResult from an AbstractConfig.
+ * This method merges the config's originals (excluding nulls and keys
present in nonInternalValues, which take priority).
+ */
+ public static DescribeConfigsResponseData.DescribeConfigsResult
createResponseConfig(
+ DescribeConfigsRequestData.DescribeConfigsResource resource,
+ AbstractConfig config,
+ BiFunction<String, Object,
DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
+
+ // Cast from Map<String, ?> to Map<String, Object> to eliminate
wildcard types. Cached to avoid multiple calls.
+ @SuppressWarnings("unchecked")
+ Map<String, Object> nonInternalValues = (Map<String, Object>)
config.nonInternalValues();
+ Stream<Entry<String, Object>> allEntries = Stream.concat(
+ config.originals().entrySet().stream()
+ .filter(entry -> entry.getValue() != null &&
!nonInternalValues.containsKey(entry.getKey()))
+ .map(entry -> Map.entry(entry.getKey(),
entry.getValue())),
+ nonInternalValues.entrySet().stream()
+ );
+ return toDescribeConfigsResult(allEntries, resource,
createConfigEntry);
+ }
+
+ /**
+ * Internal helper that builds a DescribeConfigsResult from a stream of
config entries.
+ */
+ private static DescribeConfigsResponseData.DescribeConfigsResult
toDescribeConfigsResult(
+ Stream<Entry<String, Object>> configStream,
+ DescribeConfigsRequestData.DescribeConfigsResource resource,
+ BiFunction<String, Object,
DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
+
+ var configKeys = resource.configurationKeys();
+ List<DescribeConfigsResponseData.DescribeConfigsResourceResult>
configEntries =
+ configStream
+ .filter(entry -> configKeys == null ||
+ configKeys.isEmpty() ||
+ configKeys.contains(entry.getKey()))
+ .map(entry -> createConfigEntry.apply(entry.getKey(),
entry.getValue()))
+ .toList();
+
+ return new DescribeConfigsResponseData.DescribeConfigsResult()
+ .setErrorCode(Errors.NONE.code())
+ .setConfigs(configEntries);
+ }
+}
\ No newline at end of file