mjsax commented on code in PR #12988:
URL: https://github.com/apache/kafka/pull/12988#discussion_r1645262881


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -999,6 +988,10 @@ public class StreamsConfig extends AbstractConfig {
                     (name, value) -> 
verifyTopologyOptimizationConfigs((String) value),
                     Importance.MEDIUM,
                     TOPOLOGY_OPTIMIZATION_DOC)
+            .define(ProducerConfig.PARTITIONER_CLASS_CONFIG,

Review Comment:
   Why do we add this here? It's not a StreamsConfig, and seems it should be 
added?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams

Review Comment:
   ```suggestion
       // default producer configs for Kafka Streams
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS;
     static {
         final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
-        CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+
+        KS_DEFAULT_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
+    // KS_CONTROLLED_CONSUMER_CONFIGS - Kafka Streams consumer configs that 
cannot be overridden by the user

Review Comment:
   ```suggestion
       // Kafka Streams consumer configs that cannot be overridden by the user
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        } else {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS and override values 
if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
             }
         }
+    }
 
+    private void validateProducerPropertyMap(final Map<String, Object> props) {
         if (eosEnabled) {
-            
verifyMaxInFlightRequestPerConnection(clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        } else {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS and override values 
if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
             }
         }
+    }
 
+    private void validateProducerPropertyMap(final Map<String, Object> props) {
         if (eosEnabled) {
-            
verifyMaxInFlightRequestPerConnection(clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"producer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        }
+
+        if (props.containsKey(ProducerConfig.PARTITIONER_CLASS_CONFIG)) {
+            final Class<?> c = (Class<?>) 
props.get(ProducerConfig.PARTITIONER_CLASS_CONFIG);
+            if (!StreamPartitioner.class.isAssignableFrom(c)) {

Review Comment:
   `StreamPartitioner` and the producer `Partitioner` interface are totally 
independent -- I would never expect a class to implement both interfaces. Not 
sure what the purpose of this check is?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS;
     static {
         final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
-        CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+
+        KS_DEFAULT_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
+    // KS_CONTROLLED_CONSUMER_CONFIGS - Kafka Streams consumer configs that 
cannot be overridden by the user
+    private static final Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS;
     static {
-        final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
+        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
+        
tempConsumerDefaultOverrides.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
 "false");
+
+        KS_CONTROLLED_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED - Kafka Streams consumer 
configs that cannot be overridden by the user with EOS enabled

Review Comment:
   omit?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set

Review Comment:
   Unnecessary comment -- code explains itself.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS;
     static {
         final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
-        CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+
+        KS_DEFAULT_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
+    // KS_CONTROLLED_CONSUMER_CONFIGS - Kafka Streams consumer configs that 
cannot be overridden by the user
+    private static final Map<String, Object> KS_CONTROLLED_CONSUMER_CONFIGS;
     static {
-        final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
+        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
+        
tempConsumerDefaultOverrides.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
 "false");
+
+        KS_CONTROLLED_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED - Kafka Streams consumer 
configs that cannot be overridden by the user with EOS enabled

Review Comment:
   ```suggestion
       // Kafka Streams consumer configs that cannot be overridden by the user 
with EOS enabled
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.

Review Comment:
   Why do we remove this comment?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams

Review Comment:
   ```suggestion
       // default consumer configs for Kafka Streams
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);

Review Comment:
   Wondering if we should treat deliver timeout as non-overwritable config, too?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {

Review Comment:
   ```suggestion
       private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object unmodifiableDefaultValue, final String config) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled

Review Comment:
   ```suggestion
       // Kafka Streams producer configs that cannot be overridden by the user 
with EOS enabled
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled

Review Comment:
   ```suggestion
       // default producer configs for Kafka Streams with EOS enabled
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";

Review Comment:
   ```suggestion
           final String overwritePropertyLogMessage = "Unexpected %s config 
`%s` found. User setting (%s) will be ignored and the Kafka Streams default 
setting (%s) will be used";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        } else {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS and override values 
if set

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));

Review Comment:
   Why do we do this in the validate _consumer_ properties step? It's a 
producer config.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1751,7 +1774,15 @@ public Map<String, Object> getProducerConfigs(final 
String clientId) {
             props.put("internal.auto.downgrade.txn.commit", true);
         }
 
+        validateProducerPropertyMap(props);
+
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));

Review Comment:
   this line should be removed (it's effectively replace with the new line 
added below)?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1681,6 +1702,8 @@ public Map<String, Object> 
getRestoreConsumerConfigs(final String clientId) {
         final Map<String, Object> restoreConsumerProps = 
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
         baseConsumerProps.putAll(restoreConsumerProps);
 
+        validateConsumerPropertyMap(baseConsumerProps);
+
         // no need to set group id for a restore consumer
         baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);

Review Comment:
   Should we also log a WARN if somebody tries to set it? Same for 
`group.instance.id` and `auto.offset.reset` below? Also applies to global 
consumer.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams

Review Comment:
   Do we actually need this comment at all? Seems its self-explaining?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_CONSUMER_CONFIGS;
     static {
         final Map<String, Object> tempConsumerDefaultOverrides = new 
HashMap<>();
         
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");
         
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
         tempConsumerDefaultOverrides.put("internal.leave.group.on.close", 
false);
-        CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
+
+        KS_DEFAULT_CONSUMER_CONFIGS = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
+    // KS_CONTROLLED_CONSUMER_CONFIGS - Kafka Streams consumer configs that 
cannot be overridden by the user

Review Comment:
   omit?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1500,68 +1516,71 @@ private void validateRackAwarenessConfiguration() {
         });
     }
 
+    private void overwritePropertyMap(final Map<String, Object> props, final 
String key, final Object value, final String config) {
+        final String overwritePropertyLogMessage = "Unexpected %s config: %s 
found. User setting (%s) will be ignored and the Streams default setting (%s) 
will be used";
+        
+        if (props.containsKey(key) && (!Objects.equals(props.get(key), 
value))) {
+            log.warn(String.format(overwritePropertyLogMessage, config, key, 
props.get(key), value));
+        }
+
+        props.put(key, value);
+    }
+
     private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        // Create a consumer config map with custom default values set by 
Kafka Streams
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.processingMode(this) == 
StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
         }
+        
         consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
+        overwritePropertyMap(
+            consumerProps,
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
+            originals().get(BOOTSTRAP_SERVERS_CONFIG),
+            "consumer"
+        );
 
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus the default values for these consumer/producer configurations 
that are suitable for
-        // Streams will be used instead.
-
-        final String nonConfigurableConfigMessage = "Unexpected user-specified 
%s config: %s found. %sUser setting (%s) will be ignored and the Streams 
default setting (%s) will be used ";
-        final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " 
+ getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
-
-        for (final String config: nonConfigurableConfigs) {
-            if (clientProvidedProps.containsKey(config)) {
-
-                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
-                    if 
(!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
 {
-                        log.warn(String.format(nonConfigurableConfigMessage, 
"consumer", config, "", clientProvidedProps.get(config),  
CONSUMER_DEFAULT_OVERRIDES.get(config)));
-                        clientProvidedProps.remove(config);
-                    }
-                } else if (eosEnabled) {
-                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "consumer", config, eosMessage, 
clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
-                        if 
(!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
-                            
log.warn(String.format(nonConfigurableConfigMessage,
-                                    "producer", config, eosMessage, 
clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
-                            clientProvidedProps.remove(config);
-                        }
-                    } else if 
(ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
-                        log.warn(String.format(nonConfigurableConfigMessage,
-                            "producer", config, eosMessage, 
clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
-                        clientProvidedProps.remove(config);
-                    }
-                }
+    private void validateConsumerPropertyMap(final Map<String, Object> props) {
+        if (eosEnabled) {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        } else {
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS and override values 
if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_CONSUMER_CONFIGS.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"consumer");
             }
         }
+    }
 
+    private void validateProducerPropertyMap(final Map<String, Object> props) {
         if (eosEnabled) {
-            
verifyMaxInFlightRequestPerConnection(clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+            // Iterate over KS_CONTROLLED_CONSUMER_CONFIGS_EOS_ENABLED and 
override values if set
+            for (final Map.Entry<String, Object> entry : 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED.entrySet()) {
+                overwritePropertyMap(props, entry.getKey(), entry.getValue(), 
"producer");
+            }
+            
verifyMaxInFlightRequestPerConnection(props.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
+
+        }
+
+        if (props.containsKey(ProducerConfig.PARTITIONER_CLASS_CONFIG)) {

Review Comment:
   I am actually not sure from top of my head what @ableegoldman meant by "it 
would break Kafka Streams" is set?
   
   Would be good to clarify if we actually need to do anything about it or not.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled

Review Comment:
   Or omit comment after all (same reason as above -- variable name explains 
itself?)



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1182,41 +1175,64 @@ public class StreamsConfig extends AbstractConfig {
                     WINDOW_SIZE_MS_DOC);
     }
 
-    // this is the list of configs for underlying clients
-    // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS - default producer configs for Kafka Streams
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
-        PRODUCER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
+    // KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED - default producer configs for 
Kafka Streams with EOS enabled
+    private static final Map<String, Object> 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED;
     static {
-        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(KS_DEFAULT_PRODUCER_CONFIGS);
         
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
-        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        // Reduce the transaction timeout for quicker pending offset 
expiration on broker side.
         
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
DEFAULT_TRANSACTION_TIMEOUT);
 
-        PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+        KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
+    }
+
+    // KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED - Kafka Streams producer 
configs that cannot be overridden by the user with EOS enabled
+    private static final Map<String, Object> 
KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+        KS_CONTROLLED_PRODUCER_CONFIGS_EOS_ENABLED = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
 
-    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
+    // KS_DEFAULT_CONSUMER_CONFIGS - default consumer configs for Kafka Streams

Review Comment:
   or omit (as above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to