Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1645262881 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -999,6 +988,10 @@ public class StreamsConfig extends AbstractConfig { (name, value) -> verifyTopologyOptimizationConfigs((String) value), Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC) +.define(ProducerConfig.PARTITIONER_CLASS_CONFIG, Review Comment: Why do we add this here? It's not a StreamsConfig, and seems it should be added? ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig { WINDOW_SIZE_MS_DOC); } -// this is the list of configs for underlying clients -// that streams prefer different default values -private static final Map PRODUCER_DEFAULT_OVERRIDES; +// KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams Review Comment: ```suggestion // default producer configs for Kafka Streams ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig { WINDOW_SIZE_MS_DOC); } -// this is the list of configs for underlying clients -// that streams prefer different default values -private static final Map PRODUCER_DEFAULT_OVERRIDES; +// KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams +private static final Map KS_DEFAULT_PRODUCER_CONFIGS; static { final Map tempProducerDefaultOverrides = new HashMap<>(); tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); -PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); + +KS_DEFAULT_PRODUCER_CONFIGS = Collections.unmodifiableMap(tempProducerDefaultOverrides); } -private static final Map PRODUCER_EOS_OVERRIDES; +// KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for Kafka Streams with EOS enabled +private static final Map KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED; static { -final Map tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES); +final Map tempProducerDefaultOverrides = new HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS); tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); - tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); -// Reduce the transaction timeout for quicker pending offset expiration on broker side. tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT); -PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); +KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides); +} + +// KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer configs that cannot be overridden by the user with EOS enabled +private static final Map KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED; +static { +final Map tempProducerDefaultOverrides = new HashMap<>(); + tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null); + +KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = Collections.unmodifiableMap(tempProducerDefaultOverrides); } -private static final Map CONSUMER_DEFAULT_OVERRIDES; +// KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams +private static final Map KS_DEFAULT_CONSUMER_CONFIGS; static { final Map tempConsumerDefaultOverrides = new HashMap<>(); tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); -CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + +KS_DEFAULT_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } -private static final Map CONSUMER_EOS_OVERRIDES; +// KS_CONTROLLED_CONSUMER_CONFIGS - Kafka Streams consumer configs that cannot be overridden by the user Review Comment: ```suggestion // Kafka Streams consumer configs that cannot be overridden by the user ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2145719503 @mjsax I have added the handling for `partitioner.class` config on the basis of this logic - 1. If the config set is of a class that implements `StreamPartitioner`, we let it pass 2. If it doesn't we remove the key from the properties and log a warning Adding test cases now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2143755742 Hi @mjsax requesting a review on this PR. There is one open case still that I am not sure how we should handle - it's the check @ableegoldman mentioned in here comment [here](https://github.com/apache/kafka/pull/12988/#pullrequestreview-1225470833) to handler the `ProducerConfig.PARTITIONER_CLASS_CONFIG (partitioner.class)` config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2096203457 Fixing the failing build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1590353376 ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -530,6 +530,14 @@ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false)); } +@Test +public void shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() { Review Comment: Yes, will be making this test extensible for all the consumers. Can you help me with how can I capture the log to verify WARN is being printed? I think it's a good-to-have test case. Any test that already does this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2094870332 Down are the configs for each type with whether - 1. Is it a custom default for KS but is editable by the user 2. Or, is it a fixed value controlled by KS ### Producer Configs ```markdown EoS Disabled 1. [Editable] [CustomDefault] linger.ms = 100 4. [Fixed] partitioner.class = StreamsPartitioner EoS Enabled 1. [Editable] [CustomDefault] linger.ms = 100 2. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX 5. [Editable] [CustomDefault] transaction.timeout.ms = 1 6. [Fixed] partitioner.class = StreamsPartitioner 7. [Fixed] enable.idempotence = true 8. [Fixed] transactional.id = - 7. [Validate] max.in.flight.requests.per.connection <= 5 ``` ### Main Consumer Configs ```markdown EoS Disabled 1. [Editable][CustomDefault] auto.offset.reset = earliest 2. [Editable] [CustomDefault] max.poll.records = 1000 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = EoS Enabled 1. [Editable][CustomDefault] auto.offset.reset = earliest 2. [Editable] [CustomDefault] max.poll.records = 1000 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = ``` ### Global Consumer Configs ```markdown EoS Disabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None EoS Enabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None ``` ### Restore Consumer Configs ```markdown EoS Disabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None EoS Enabled 1. [Editable] [CustomDefault] max.poll.records = 1000 2. [Fixed] auto.offset.reset = None 3. [Fixed] allow.auto.create.topics = false 4. [Fixed] enable.auto.commit = false 5. [Fixed] group.id = None ``` There are a few more that are coded ad-hoc within the code that I haven't included. Seemed like a broader change for Streams Configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2094869760 Hi @mjsax, apologies for the extremely absent behavior on this PR. I have gone ahead and implemented the changes. The tests are pending and currently working on them. Detailing the implementation down. There are two pieces that KS controls - 1. Custom Default - Configs that have custom default values for KS compared to the actual defaults. These values can also be overwritten by the user. 2. Controlled Configs - Configs that are controlled by KS and cannot be overwritten by the user (We want to warn the user that this value is being overwritten if set by the user) ### Previous Implementation - 1. We used to have the following data structures ```java String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS // Controlled KS Consumer Configs String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIG // Controlled KS Consumer Configs when EoS enabled String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS // Controlled KS Producer Config when EoS enabled Map PRODUCER_DEFAULT_OVERRIDES // Producer Custom Default + Controlled Config Values Map PRODUCER_EOS_OVERRIDES // Producer Custom Default + Controlled Config Values with EoS Map CONSUMER_DEFAULT_OVERRIDES // Consumer Custom Default + Controlled Config Values Map CONSUMER_EOS_OVERRIDES // Consumer Custom Default + Controlled Config Values with EoS ``` 2. The steps to return the required config broadly were: 1. **Get client configs**: Gather client configurations with prefixes either `consumer.` or `producer.` and put them in `clientProvidedProps`. 2. **Clean `clientProvidedProps`**: Use the method `checkIfUnexpectedUserSpecifiedConsumerConfig` to tidy up `clientProvidedProps`. 3. **Create `props`**: Generate `props` using either `<>_DEFAULT_OVERRIDES` or `<>_EOS_OVERRIDES`. 4. **Overwrite `props`**: Replace `props` with the cleaned `clientProvidedProps`. 5. **Fetch additional configs (only for consumer props)**: If it's consumer props, fetch configurations set using `main.consumer.`, `global.consumer.`, or `restore.consumer.` and add them to the `props` map. 3. After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props. ### Current Implementation - 1. Give away with the old data structures and define the following new ones - ```java Map KS_DEFAULT_PRODUCER_CONFIGS // KS Custom Defaults for Producer Map KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED // KS Custom Defaults for Producer with EoS Map KS_CONTROLLED_PRODUCER_CONFIGS // KS Controlled Configs for Producer Map KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Producer with EoS Map KS_DEFAULT_CONSUMER_CONFIGS // KS Custom Defaults for Consumer Map KS_CONTROLLED_CONSUMER_CONFIGS // KS Controlled Configs for Consumer Map KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED // KS Controlled Configs for Consumer with EoS ``` 2. The steps to return the required config now are: 1. **Get client configs**: Obtain client configurations with prefixes either `consumer.` or `producer.` and place them in `clientProvidedProps`. 2. **Create `props`**: Generate `props` using either `KS_DEFAULT_<>_CONFIGS` or `KS_DEFAULT_<>_CONFIGS_EOS_ENABLED`. 3. **Overwrite `props`**: Replace `props` with the cleaned `clientProvidedProps`. 4. **Fetch additional configs (only for consumer props)**: If it's consumer props, fetch configurations set using `main.consumer.`, `global.consumer.`, or `restore.consumer.` and add them to the `props` map. 5. **Run validation check over `props`**: Perform a validation check on `props`. This check will use `KS_CONTROLLED_<>_CONFIGS` or `KS_CONTROLLED_<>_CONFIGS_EOS_ENABLED` maps to see if the values are already set in `props`. If they are, log a warning and overwrite them. If not, add them to `props`. 4. After the initial setup, we make some tweaks based on whether it's for a consumer or producer, and then we hand back the props. Below, I'll share the configurations organized into custom defaults and controlled configs for both consumers and producers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1589029328 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig { PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } +private static final Map KS_DEFAULT_CONSUMER_CONFIGS; +static { +final Map tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); + tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); + +KS_CONTROLLED_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); Review Comment: `KS_DEFAULT_CONSUMER_CONFIGS` Is a map of defaults that Kafka Streams prefers. It does not lock these properties. The user can overwrite these configs. `KS_CONTROLLED_CONSUMER_CONFIGS` and `KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED` are the configs we do not allow being overwritten. KS will overwrite the user configs if they exist to the values set in these two maps -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1589029328 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig { PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } +private static final Map KS_DEFAULT_CONSUMER_CONFIGS; +static { +final Map tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); + tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); + +KS_CONTROLLED_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); Review Comment: `KS_DEFAULT_CONSUMER_CONFIGS` Is a map of defaults that Kafka Streams prefers. It does not lock these properties. The user can overwrite these configs. `KS_CONTROLLED_CONSUMER_CONFIGS` and `KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED` are the configs we do not allow being overwritten. KS will overwrite the user configs if they exist to the values set in this map -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1589029328 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig { PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } +private static final Map KS_DEFAULT_CONSUMER_CONFIGS; +static { +final Map tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); + tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); + +KS_CONTROLLED_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); Review Comment: `KS_DEFAULT_CONSUMER_CONFIGS` Is a map of defaults that Kafka Streams prefers. It does not lock these properties. The user can overwrite these configs. `KS_CONTROLLED_CONSUMER_CONFIGS` and `KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED` are the configs we do not allow being overwritten. KS will overwrite the user configs in the end -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1588576144 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1202,16 +1202,47 @@ public class StreamsConfig extends AbstractConfig { PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); } +private static final Map KS_DEFAULT_CONSUMER_CONFIGS; +static { +final Map tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); + tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); + +KS_CONTROLLED_CONSUMER_CONFIGS = Collections.unmodifiableMap(tempConsumerDefaultOverrides); Review Comment: We do allow users to modify these configs. I don't think we want to lock it down? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-2084303833 @ashmeet13 -- any updates on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on code in PR #12988: URL: https://github.com/apache/kafka/pull/12988#discussion_r1509672158 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1140,6 +1145,7 @@ public class StreamsConfig extends AbstractConfig { static { final Map tempProducerDefaultOverrides = new HashMap<>(); tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + tempProducerDefaultOverrides.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "none"); Review Comment: Default is already `null` -- why do we need to set it? ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -530,6 +530,14 @@ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { assertThat(consumerConfigs.get("internal.leave.group.on.close"), is(false)); } +@Test +public void shouldResetToDefaultIfConsumerAllowAutoCreateTopicsIsOverridden() { Review Comment: This should apply to all consumers, right? Should we extend the test accordingly? Should we also capture the logs and verify that the WARN is printed (not sure if necessary)? ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1883,3 +1886,20 @@ public static void main(final String[] args) { System.out.println(CONFIG.toHtml(4, config -> "streamsconfigs_" + config)); } } + + +public Map getMainConsumerConfigs(...) { Review Comment: `StreamsConfig` is public API and we cannot just modify it w/o a KIP. -- Also, why do we need this new method to begin with? We already have `getMainConsumerConfigs(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1842424947 Hi @mjsax apologies for the delay. Pushing this soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1831199541 @ashmeet13 -- Any update on this PR? We are coming up to 3.7 release code freeze deadline. Might be nice to finish this on time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1762139883 Got it! I'll make this change - for now I have gone through the code and the following two references and compiled a list of configs that are somehow "controlled" by KS. For now sharing the Producer Configs here and soon Consumer Configs too **Producer Configs with EoS Disabled** ``` 1. [Editable] [CustomDefault] linger.ms = 100 2. [Fixed] partitioner.class = StreamsPartitioner ``` **Producer Configs with EoS Disabled** ``` 1. [Editable] [CustomDefault] linger.ms = 100 2. [Fixed] partitioner.class = StreamsPartitioner 3. [Fixed] enable.idempotence = true 4. [Validate] max.in.flight.requests.per.connection <= 5 5. [Fixed] [NoDefault] transactional.id = - 6. [Editable] [CustomDefault] delivery.timeout.ms = Integer.MAX 7. [Editable] [CustomDefault] transaction.timeout.ms = 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
mjsax commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1760676675 Thanks for digging into this -- I think you are spot on -- seem we should extract a method that will set KS controlled config, and refactor `getMainConsumerConfigs` to first call `getCommonConsumerConfigs()`, than apply `main.consumer` configs, and in a last step call the new method to set KS controlled configs. I assume we need to do something similar for restore and global consumer? -- To be fair, I was actually aware that something is off and still have a (old and stale) local branch adding corresponding testing to `StreamsConfigTest` to verify that overwrite hierarchy works as expected... Would be great if you could also look into this test... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]
ashmeet13 commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1752128811 Got it @mjsax - Sharing the code that seems to be causing this bypass. Currently to fetch any consumer config i.e. `main`, `restore` or `global` we use a common function `getCommonConsumerConfigs` It's within the `getCommonConsumerConfigs` function where we check and override the configs preferred by streams - ```java private Map getCommonConsumerConfigs() { // Fetch all consumer props starting with "consumer." clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); // CLean out any properties that were set but need to be controlled by streams checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); checkIfUnexpectedUserSpecifiedConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); // Create a config map of the preferred props and merge it with the cleaned props from above final consumerProps =new (eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); consumerProps.putAll(clientProvidedProps); } ``` And the logic within `getMainConsumerConfigs` is - ```java public Map getMainConsumerConfigs(...) { // Fetch the props starting with "consumer." after cleaning // any props that needed to be overwritten final consumerProps = getCommonConsumerConfigs(); // Get main consumer override props i.e. the ones // starting with "main.consumer." and merge the two maps. final mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX); for (final entry: mainConsumerProps.entrySet()) { consumerProps.put(entry.getKey(), entry.getValue()); // Continue processing and filling in other required configs } ``` Do you think I've understood this piece correct? If so should a fix go for this within this PR itself? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org