mjsax commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1678407405
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() { }); } + private void overwritePropertyMap(final Map<String, Object> props, final String key, final Object unmodifiableDefaultValue, final String config) { Review Comment: ```suggestion private void overwritePropertyMap(final Map<String, Object> props, final String configName, final Object unmodifiableDefaultValue, final String clientType) { ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() { }); } + private void overwritePropertyMap(final Map<String, Object> props, final String key, final Object unmodifiableDefaultValue, final String config) { + final String overwritePropertyLogMessage = "Unexpected %s config `%s` found. User setting (%s) will be ignored and the Kafka Streams default setting (%s) will be used"; Review Comment: ```suggestion final String overwritePropertyLogMessage = "Unexpected %s config `%s` found. User setting (%s) will be ignored and the Kafka Streams setting (%s) will be used"; ``` ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -530,6 +530,14 @@ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false)); } + @Test + public void shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() { Review Comment: > Can you help me with how can I capture the log to verify WARN is being printed? I There is `org.apache.kafka.common.utils.LogCaptureAppender` class -- it's used in other tests, too, that you can use a blue print. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1609,6 +1605,8 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St final Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); consumerProps.putAll(mainConsumerProps); + validateConsumerPropertyMap(consumerProps); Review Comment: See below: we set `group.id` and `client.id`... We also modify `group.instance.id` -- should we log a INFO level warn for this case (a WARN is not appropriate because it's nothing wrong with user provided configs...) We also do ``` // disable auto topic creation consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); ``` which I believe we can remove in here, as it's already covered vis "unmodifiable configs" now? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1182,41 +1171,59 @@ public class StreamsConfig extends AbstractConfig { WINDOW_SIZE_MS_DOC); } - // this is the list of configs for underlying clients - // that streams prefer different default values - private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES; + private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS; static { final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); - PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); + + KS_DEFAULT_PRODUCER_CONFIGS = Collections.unmodifiableMap(tempProducerDefaultOverrides); } - private static final Map<String, Object> PRODUCER_EOS_OVERRIDES; + private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED; static { - final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); + final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS); tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); - tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Reduce the transaction timeout for quicker pending offset expiration on broker side. tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT); - PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); + KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides); + } + + private static final Map<String, Object> KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED; + static { + final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); + tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); + + KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides); } - private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; + private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS; static { final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); - CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + + KS_DEFAULT_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + } + + private static final Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS; Review Comment: We actually also control `group.id` and `client.id` (`client.id` for all clients...) ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() { }); } + private void overwritePropertyMap(final Map<String, Object> props, final String key, final Object unmodifiableDefaultValue, final String config) { + final String overwritePropertyLogMessage = "Unexpected %s config `%s` found. User setting (%s) will be ignored and the Kafka Streams default setting (%s) will be used"; Review Comment: I would remove "default" here, because we also log this message for `bootstrap.server` which does not have a default (it would just come from `StreamsConfig`) ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1714,6 +1714,8 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) { final Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX); baseConsumerProps.putAll(globalConsumerProps); + validateConsumerPropertyMap(baseConsumerProps); + // no need to set group id for a global consumer baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG); Review Comment: As above. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1500,68 +1507,57 @@ private void validateRackAwarenessConfiguration() { }); } + private void overwritePropertyMap(final Map<String, Object> props, final String key, final Object unmodifiableDefaultValue, final String config) { + final String overwritePropertyLogMessage = "Unexpected %s config `%s` found. User setting (%s) will be ignored and the Kafka Streams default setting (%s) will be used"; + + if (props.containsKey(key) && (!Objects.equals(props.get(key), unmodifiableDefaultValue))) { + log.warn(String.format(overwritePropertyLogMessage, config, key, props.get(key), unmodifiableDefaultValue)); Review Comment: For `transaction.id` we would log `[...] Kafka Streams default setting (null) will be used"`. But this not accurate. In the end, we use - eos_v1: `<application.id>-<task_id>` - eos_v2: `<application.id>-<processId>-<threadIdx>` I think it's worth to put some if-else in here to have special handling for `transactional.id` and not print `null` but one of both expressions from above depending which version of eos is used. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1681,6 +1702,8 @@ public Map<String, Object> getRestoreConsumerConfigs(final String clientId) { final Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX); baseConsumerProps.putAll(restoreConsumerProps); + validateConsumerPropertyMap(baseConsumerProps); + // no need to set group id for a restore consumer baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG); Review Comment: This comments was not addressed yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org