This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 42041f47729 MINOR: Refactor createResponseConfig to avoid collection 
copy and conversion (#19867)
42041f47729 is described below

commit 42041f47729947dc9da0302cb3b747427f5cef97
Author: Yunchi Pang <[email protected]>
AuthorDate: Wed Jul 2 06:32:11 2025 -0700

    MINOR: Refactor createResponseConfig to avoid collection copy and 
conversion (#19867)
    
    issue: https://github.com/apache/kafka/pull/19687/files#r2094574178
    
    Why:
    - To improve performance by avoiding redundant temporary collections and
    repeated method calls.
    - To make the utility more flexible for inputs from both Java and Scala.
    
    What:
    - Refactored `createResponseConfig` in `ConfigHelper.scala` by
    overloading the method to accept both Java maps and `AbstractConfig`.
    - Extracted helper functions to `ConfigHelperUtils` in the server
    module.
    
    Reviewers: Ken Huang <[email protected]>, Jhen-Yung Hsu
    <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping
    Tsai <[email protected]>
---
 .../src/main/scala/kafka/server/ConfigHelper.scala | 42 +++-------
 .../org/apache/kafka/server/ConfigHelperUtils.java | 90 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala 
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 9a992a55c74..743937b54fc 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -22,7 +22,7 @@ import kafka.network.RequestChannel
 import java.util.{Collections, Properties}
 import kafka.utils.Logging
 import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
@@ -34,6 +34,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
 import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
+import org.apache.kafka.server.ConfigHelperUtils.createResponseConfig
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
 import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.server.metrics.ClientMetricsConfigs
@@ -45,10 +46,6 @@ import scala.jdk.OptionConverters.RichOptional
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
-  def allConfigs(config: AbstractConfig): mutable.Map[String, Any] = {
-    config.originals.asScala.filter(_._2 != null) ++ 
config.nonInternalValues.asScala
-  }
-
   def handleDescribeConfigsRequest(
     request: RequestChannel.Request,
     authHelper: AuthHelper
@@ -86,21 +83,6 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
                       includeSynonyms: Boolean,
                       includeDocumentation: Boolean): 
List[DescribeConfigsResponseData.DescribeConfigsResult] = {
     resourceToConfigNames.map { resource =>
-
-      def createResponseConfig(configs: Map[String, Any],
-                               createConfigEntry: (String, Any) => 
DescribeConfigsResponseData.DescribeConfigsResourceResult): 
DescribeConfigsResponseData.DescribeConfigsResult = {
-        val filteredConfigPairs = if (resource.configurationKeys == null || 
resource.configurationKeys.isEmpty)
-          configs.toBuffer
-        else
-          configs.filter { case (configName, _) =>
-            resource.configurationKeys.asScala.contains(configName)
-          }.toBuffer
-
-        val configEntries = filteredConfigPairs.map { case (name, value) => 
createConfigEntry(name, value) }
-        new 
DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.NONE.code)
-          .setConfigs(configEntries.asJava)
-      }
-
       try {
         val configResult = ConfigResource.Type.forId(resource.resourceType) 
match {
           case ConfigResource.Type.TOPIC =>
@@ -109,7 +91,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
             if (metadataCache.contains(topic)) {
               val topicProps = configRepository.topicConfig(topic)
               val logConfig = LogConfig.fromProps(config.extractLogConfigMap, 
topicProps)
-              createResponseConfig(allConfigs(logConfig), 
createTopicConfigEntry(logConfig, topicProps, includeSynonyms, 
includeDocumentation))
+              createResponseConfig(resource, logConfig, 
createTopicConfigEntry(logConfig, topicProps, includeSynonyms, 
includeDocumentation)(_, _))
             } else {
               new 
DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
                 
.setConfigs(Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])
@@ -117,11 +99,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
 
           case ConfigResource.Type.BROKER =>
             if (resource.resourceName == null || resource.resourceName.isEmpty)
-              
createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs,
-                createBrokerConfigEntry(perBrokerConfig = false, 
includeSynonyms, includeDocumentation))
+              createResponseConfig(resource, 
config.dynamicConfig.currentDynamicDefaultConfigs.asJava,
+                createBrokerConfigEntry(perBrokerConfig = false, 
includeSynonyms, includeDocumentation)(_, _))
             else if (resourceNameToBrokerId(resource.resourceName) == 
config.brokerId)
-              createResponseConfig(allConfigs(config),
-                createBrokerConfigEntry(perBrokerConfig = true, 
includeSynonyms, includeDocumentation))
+              createResponseConfig(resource, config,
+                createBrokerConfigEntry(perBrokerConfig = true, 
includeSynonyms, includeDocumentation)(_, _))
             else
               throw new InvalidRequestException(s"Unexpected broker id, 
expected ${config.brokerId} or empty string, but received 
${resource.resourceName}")
 
@@ -131,8 +113,8 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
             else if (resourceNameToBrokerId(resource.resourceName) != 
config.brokerId)
               throw new InvalidRequestException(s"Unexpected broker id, 
expected ${config.brokerId} but received ${resource.resourceName}")
             else
-              createResponseConfig(LoggingController.loggers.asScala,
-                (name, value) => new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
+              createResponseConfig(resource, LoggingController.loggers,
+                (name: String, value: Object) => new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
 
@@ -142,7 +124,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
             } else {
               val clientMetricsProps = configRepository.config(new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, resource.resourceName))
               val clientMetricsConfig = 
ClientMetricsConfigs.fromProps(ClientMetricsConfigs.defaultConfigsMap(), 
clientMetricsProps)
-              createResponseConfig(allConfigs(clientMetricsConfig), 
createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, 
includeSynonyms, includeDocumentation))
+              createResponseConfig(resource, clientMetricsConfig, 
createClientMetricsConfigEntry(clientMetricsConfig, clientMetricsProps, 
includeSynonyms, includeDocumentation)(_, _))
             }
 
           case ConfigResource.Type.GROUP =>
@@ -152,7 +134,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
             } else {
               val groupProps = configRepository.groupConfig(group)
               val groupConfig = 
GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
 groupProps)
-              createResponseConfig(allConfigs(groupConfig), 
createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, 
includeDocumentation))
+              createResponseConfig(resource, groupConfig, 
createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, 
includeDocumentation)(_, _))
             }
 
           case resourceType => throw new InvalidRequestException(s"Unsupported 
resource type: $resourceType")
@@ -322,4 +304,4 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
         throw new InvalidRequestException(s"Broker id must be an integer, but 
it is: $resourceName")
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java 
b/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java
new file mode 100644
index 00000000000..b104de4c156
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ConfigHelperUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.message.DescribeConfigsRequestData;
+import org.apache.kafka.common.message.DescribeConfigsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+
+public class ConfigHelperUtils {
+
+    /**
+     * Creates a DescribeConfigsResult from a Map of configs.
+     */
+    public static <V> DescribeConfigsResponseData.DescribeConfigsResult 
createResponseConfig(
+            DescribeConfigsRequestData.DescribeConfigsResource resource,
+            Map<String, V> config,
+            BiFunction<String, Object, 
DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
+
+        return toDescribeConfigsResult(
+                config.entrySet().stream()
+                        .map(entry -> Map.entry(entry.getKey(), 
entry.getValue())),
+                resource,
+                createConfigEntry
+        );
+    }
+
+    /**
+     * Creates a DescribeConfigsResult from an AbstractConfig.
+     * This method merges the config's originals (excluding nulls and keys 
present in nonInternalValues, which take priority).
+     */
+    public static DescribeConfigsResponseData.DescribeConfigsResult 
createResponseConfig(
+            DescribeConfigsRequestData.DescribeConfigsResource resource,
+            AbstractConfig config,
+            BiFunction<String, Object, 
DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
+
+        // Cast from Map<String, ?> to Map<String, Object> to eliminate 
wildcard types. Cached to avoid multiple calls.
+        @SuppressWarnings("unchecked")
+        Map<String, Object> nonInternalValues = (Map<String, Object>) 
config.nonInternalValues();
+        Stream<Entry<String, Object>> allEntries = Stream.concat(
+                config.originals().entrySet().stream()
+                        .filter(entry -> entry.getValue() != null && 
!nonInternalValues.containsKey(entry.getKey()))
+                        .map(entry -> Map.entry(entry.getKey(), 
entry.getValue())),
+                nonInternalValues.entrySet().stream()
+        );
+        return toDescribeConfigsResult(allEntries, resource, 
createConfigEntry);
+    }
+
+    /**
+     * Internal helper that builds a DescribeConfigsResult from a stream of 
config entries.
+     */
+    private static DescribeConfigsResponseData.DescribeConfigsResult 
toDescribeConfigsResult(
+            Stream<Entry<String, Object>> configStream,
+            DescribeConfigsRequestData.DescribeConfigsResource resource,
+            BiFunction<String, Object, 
DescribeConfigsResponseData.DescribeConfigsResourceResult> createConfigEntry) {
+
+        var configKeys = resource.configurationKeys();
+        List<DescribeConfigsResponseData.DescribeConfigsResourceResult> 
configEntries =
+                configStream
+                        .filter(entry -> configKeys == null ||
+                                configKeys.isEmpty() ||
+                                configKeys.contains(entry.getKey()))
+                        .map(entry -> createConfigEntry.apply(entry.getKey(), 
entry.getValue()))
+                        .toList();
+
+        return new DescribeConfigsResponseData.DescribeConfigsResult()
+                .setErrorCode(Errors.NONE.code())
+                .setConfigs(configEntries);
+    }
+}
\ No newline at end of file

Reply via email to