[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510767#comment-17510767
 ] 

Guozhang Wang commented on KAFKA-13689:
---------------------------------------

Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

```
StreamsConfig config = new StreamsConfig(map);
```

If we call

```
config.get(PREDEFINED_CONFIG_NAME)
```

then the param must be a defined config name, otherwise it will throw;

If we call

```
config.originals().get(ANY_CONFIG_NAME)
```

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), but 
the `logUnused` could still contain case 1) or case 3) whereas we really only 
want to WARN on case 1) above. The latter issue is a bit out of the scope of 
this JIRA, hence I said just doing the above two is sufficient for this issue.

----------------------------

If we want to resolve the second as well in this JIRA, I'm thinking we can do 
sth. aligned with your proposal in the description as "AbstractConfig provides 
two new methods: logUnknown() and unknown()" but to avoid creating a KIP for 
adding public APIs, we can just do them inside the existing `logUnused`, as 
something like:

```
    public Set<String> unused() {
        Set<String> keys = new HashSet<>(values.keySet());   // here we take 
the diff between values and used.
        keys.removeAll(used);
        return keys;
    }

    // keep it private
    private Set<String> unknown() {
        Set<String> keys = new HashSet<>(originals.keySet());   // here we take 
the diff between originals and values.
        keys.removeAll(values);
        return keys;
    }

public void logUnused() {
    Set<String> unusedkeys = unused(); 
    for (String key : unused())
        log.warn("The configuration '{}' was supplied but isn't a used.", key); 
// here we still log one line per config as a WARN

    Set<String> unusedkeys = unknown(); 
    if (!unknown.isEmpty()) {
        log.info("These configurations '{}' were not known.", unusedkeys);  // 
here we log one line for all configs as INFO.
    }
} 
```

> AbstractConfig log print information is incorrect
> -------------------------------------------------
>
>                 Key: KAFKA-13689
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13689
>             Project: Kafka
>          Issue Type: Bug
>          Components: config
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Assignee: RivenSun
>            Priority: Major
>             Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 60000.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff0000}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set<String> unused() {
>     Set<String> keys = new HashSet<>(originals.keySet());
>     keys.removeAll(used);
>     return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
>     if (!values.containsKey(key))
>         throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
>     used.add(key);
>     return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
>         log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
>     for (String key : unknown())
>         log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set<String> unknown() {
>     Set<String> keys = new HashSet<>(originals.keySet());
>     keys.removeAll(values.keySet());
>     return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to