Nikita-Shupletsov commented on code in PR #20149:
URL: https://github.com/apache/kafka/pull/20149#discussion_r2497222180


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1282,33 +1272,59 @@ public class StreamsConfig extends AbstractConfig {
 
     // this is the list of configs for underlying clients
     // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES = 
Map.of(ProducerConfig.LINGER_MS_CONFIG, "100");
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;
+    static {
+        final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
+        tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
+        
+        KS_DEFAULT_PRODUCER_CONFIGS = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);

Review Comment:
   I am not 100% sure adding the `KS_` suffix is needed. 



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1282,33 +1272,59 @@ public class StreamsConfig extends AbstractConfig {
 
     // this is the list of configs for underlying clients
     // that streams prefer different default values
-    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES = 
Map.of(ProducerConfig.LINGER_MS_CONFIG, "100");
+    private static final Map<String, Object> KS_DEFAULT_PRODUCER_CONFIGS;

Review Comment:
   I understand that this PR is based on: 
https://github.com/apache/kafka/pull/12988 and takes a lot of code from it. but 
because that PR was opened before https://github.com/apache/kafka/pull/18305. 
reapplying the changes reverts the changes from the latter.
   
   I would recommend to keep using Map.of



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1939,13 +1927,13 @@ public Map<String, Object> 
getGlobalConsumerConfigs(final String clientId) {
     public Map<String, Object> getProducerConfigs(final String clientId) {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
-
         // generate producer configs from original properties and overridden 
maps
-        final Map<String, Object> props = new HashMap<>(eosEnabled ? 
PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> props = new HashMap<>(eosEnabled ? 
KS_DEFAULT_PRODUCER_CONFIGS_EOS_ENABLED : KS_DEFAULT_PRODUCER_CONFIGS);

Review Comment:
   and here we preserved the old behavior for eos. I would propose to consider 
using the same approach if possible



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1676,10 +1692,7 @@ private Map<String, Object> getCommonConsumerConfigs() {
 
         clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);
         if (StreamsConfigUtils.eosEnabled(this)) {
             
consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);

Review Comment:
   there is a bit of inconsistency, as we won't warn on this, for example



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1676,10 +1692,7 @@ private Map<String, Object> getCommonConsumerConfigs() {
 
         clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-
-        final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        final Map<String, Object> consumerProps = new 
HashMap<>(KS_DEFAULT_CONSUMER_CONFIGS);

Review Comment:
   as we want to validate and warn on the default EOS configs now, maybe it 
makes sense to validate the rest of the configs here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to