[ https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510767#comment-17510767 ]
Guozhang Wang commented on KAFKA-13689: --------------------------------------- Okay I think we are really on the same page here, just are using different terminologies :) Where I'm from is the Streams usage, e.g. let's say we create a StreamsConfig object from: ``` StreamsConfig config = new StreamsConfig(map); ``` If we call ``` config.get(PREDEFINED_CONFIG_NAME) ``` then the param must be a defined config name, otherwise it will throw; If we call ``` config.originals().get(ANY_CONFIG_NAME) ``` then it tries to get from the underlying maps directly, and any config names including the custom unknown configs can be retrieved. So that's how custom configs are retrieved (like you said, some modules use `(config.originals() UNION config.values()) .get(ANY_CONFIG_NAME).` as well). Now, I realize since `logUnused` is triggered at the construction of the consumer/producer clients, whereas those embedded module's custom configs are possibly not yet retrieved as they will only be constructed and initialized at a later time, in which case `used` set would not yet contain any of those unknown configs yet. As a result: 1) all defined, used configs should be included in `used` since they are retrieved via the first call above. This is the case we want to guarantee and WARN if not since it may indicates a bug. 2) all defined, but disabled configs are included in `used` since they are called via `config.ignore()`. This is what we want to fix in this JIRA. 3) all unknown configs may or may not be included in `used` and that's out of the AbstractConfig object's control. So by doing the above two, we can fix this JIRA ticket which is case 2), but the `logUnused` could still contain case 1) or case 3) whereas we really only want to WARN on case 1) above. The latter issue is a bit out of the scope of this JIRA, hence I said just doing the above two is sufficient for this issue. ---------------------------- If we want to resolve the second as well in this JIRA, I'm thinking we can do sth. aligned with your proposal in the description as "AbstractConfig provides two new methods: logUnknown() and unknown()" but to avoid creating a KIP for adding public APIs, we can just do them inside the existing `logUnused`, as something like: ``` public Set<String> unused() { Set<String> keys = new HashSet<>(values.keySet()); // here we take the diff between values and used. keys.removeAll(used); return keys; } // keep it private private Set<String> unknown() { Set<String> keys = new HashSet<>(originals.keySet()); // here we take the diff between originals and values. keys.removeAll(values); return keys; } public void logUnused() { Set<String> unusedkeys = unused(); for (String key : unused()) log.warn("The configuration '{}' was supplied but isn't a used.", key); // here we still log one line per config as a WARN Set<String> unusedkeys = unknown(); if (!unknown.isEmpty()) { log.info("These configurations '{}' were not known.", unusedkeys); // here we log one line for all configs as INFO. } } ``` > AbstractConfig log print information is incorrect > ------------------------------------------------- > > Key: KAFKA-13689 > URL: https://issues.apache.org/jira/browse/KAFKA-13689 > Project: Kafka > Issue Type: Bug > Components: config > Affects Versions: 3.0.0 > Reporter: RivenSun > Assignee: RivenSun > Priority: Major > Fix For: 3.2.0 > > > h1. 1.Example > KafkaClient version is 3.1.0, KafkaProducer init properties: > > {code:java} > Properties props = new Properties(); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code} > > > Partial log of KafkaProducer initialization: > {code:java} > ssl.truststore.location = C:\Personal > File\documents\KafkaSSL\client.truststore.jks > ssl.truststore.password = [hidden] > ssl.truststore.type = JKS > transaction.timeout.ms = 60003 > transactional.id = null > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer[main] INFO > org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully > logged in. > [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The > configuration 'transaction.timeout.ms' was supplied but isn't a known config. > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 37edeed0777bacb3 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1645602332999 {code} > From the above log, you can see that KafkaProducer has applied the user's > configuration, {*}transaction.timeout.ms=60003{*}, the default value of this > configuration is 60000. > But we can see another line of log: > [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The > configuration *'transaction.timeout.ms'* was supplied but isn't a > *{color:#ff0000}known{color}* config. > > h1. 2.RootCause: > 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}. > So the configurations related to the KafkaProducer transaction will not be > requested. > See the source code: KafkaProducer#configureTransactionState(...) . > 2) AbstractConfig#logUnused() -> AbstractConfig#unused() > {code:java} > public Set<String> unused() { > Set<String> keys = new HashSet<>(originals.keySet()); > keys.removeAll(used); > return keys; > } {code} > If a configuration has not been requested, the configuration will not be put > into the used variable. SourceCode see as below: > AbstractConfig#get(String key) > > {code:java} > protected Object get(String key) { > if (!values.containsKey(key)) > throw new ConfigException(String.format("Unknown configuration '%s'", > key)); > used.add(key); > return values.get(key); > } {code} > h1. > h1. Solution: > 1. AbstractConfig#logUnused() method > Modify the log printing information of this method,and the unused > configuration log print level can be changed to {*}INFO{*}, what do you think? > {code:java} > /** > * Log infos for any unused configurations > */ > public void logUnused() { for (String key : unused()) > log.info("The configuration '{}' was supplied but isn't a used > config.", key); > }{code} > > > 2. AbstractConfig provides two new methods: logUnknown() and unknown() > {code:java} > /** > * Log warnings for any unknown configurations > */ > public void logUnknown() { > for (String key : unknown()) > log.warn("The configuration '{}' was supplied but isn't a known > config.", key); > } {code} > > {code:java} > public Set<String> unknown() { > Set<String> keys = new HashSet<>(originals.keySet()); > keys.removeAll(values.keySet()); > return keys; > } {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)