mjsax commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1678407405


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object unmodifiableDefaultValue, final String config) {

Review Comment:
   ```suggestion
       private void overwritePropertyMap(final Map<String, Object> props, final 
String configName, final Object unmodifiableDefaultValue, final String 
clientType) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object unmodifiableDefaultValue, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config `%s` 
found. User setting (%s) will be ignored and the Kafka Streams default setting 
(%s) will be used";

Review Comment:
   ```suggestion
           final String overwritePropertyLogMessage = "Unexpected %s config 
`%s` found. User setting (%s) will be ignored and the Kafka Streams setting 
(%s) will be used";
   ```



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -530,6 +530,14 @@ public void 
shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
is(false));
     }
 
+    @Test
+    public void 
shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() {

Review Comment:
   > Can you help me with how can I capture the log to verify WARN is being 
printed? I
   
   There is `org.apache.kafka.common.utils.LogCaptureAppender` class -- it's 
used in other tests, too, that you can use a blue print.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1609,6 +1605,8 @@ public Map<String, Object> getMainConsumerConfigs(final 
String groupId, final St
         final Map<String, Object> mainConsumerProps = 
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
         consumerProps.putAll(mainConsumerProps);
 
+        validateConsumerPropertyMap(consumerProps);

Review Comment:
   See below: we set `group.id` and `client.id`...
   
   We also modify `group.instance.id` -- should we log a INFO level warn for 
this case (a WARN is not appropriate because it's nothing wrong with user 
provided configs...)
   
   We also do
   ```
   // disable auto topic creation
   consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
   ```
   which I believe we can remove in here, as it's already covered vis 
"unmodifiable configs" now?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1171,59 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
         // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS;
     static {
         final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
-        CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+
+        KS_DEFAULT_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+    }
+
+    private static final Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS;

Review Comment:
   We actually also control `group.id` and `client.id` (`client.id` for all 
clients...)



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object unmodifiableDefaultValue, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config `%s` 
found. User setting (%s) will be ignored and the Kafka Streams default setting 
(%s) will be used";

Review Comment:
   I would remove "default" here, because we also log this message for 
`bootstrap.server` which does not have a default (it would just come from 
`StreamsConfig`)



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1714,6 +1714,8 @@ public Map<String, Object> getGlobalConsumerConfigs(final 
String clientId) {
         final Map<String, Object> globalConsumerProps = 
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
         baseConsumerProps.putAll(globalConsumerProps);
 
+        validateConsumerPropertyMap(baseConsumerProps);
+
         // no need to set group id for a global consumer
         baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);

Review Comment:
   As above.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object unmodifiableDefaultValue, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config `%s` 
found. User setting (%s) will be ignored and the Kafka Streams default setting 
(%s) will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
unmodifiableDefaultValue))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), unmodifiableDefaultValue));

Review Comment:
   For `transaction.id` we would log `[...] Kafka Streams default setting 
(null) will be used"`. But this not accurate. In the end, we use
   - eos_v1: `<application.id>-<task_id>`
   - eos_v2: `<application.id>-<processId>-<threadIdx>`
   
   I think it's worth to put some if-else in here to have special handling for 
`transactional.id` and not print `null` but one of both expressions from above 
depending which version of eos is used.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1681,6 +1702,8 @@ public Map<String, Object> 
getRestoreConsumerConfigs(final String clientId) {
         final Map<String, Object> restoreConsumerProps = 
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
         baseConsumerProps.putAll(restoreConsumerProps);
 
+        validateConsumerPropertyMap(baseConsumerProps);
+
         // no need to set group id for a restore consumer
         baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);

Review Comment:
   This comments was not addressed yet. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to