ijuma commented on a change in pull request #11788:
URL: https://github.com/apache/kafka/pull/11788#discussion_r810402844



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##########
@@ -461,27 +464,55 @@ private void 
postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
         final Map<String, Object> originalConfigs = this.originals();
         final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
         configs.put(ACKS_CONFIG, acksStr);
+        final boolean userConfiguredIdempotence = 
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean idempotenceEnabled = 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean shouldDisableIdempotence = false;
 
-        // For idempotence producers, values for `RETRIES_CONFIG` and 
`ACKS_CONFIG` need validation
-        if (idempotenceEnabled()) {
+        // For idempotence producers, values for `retries` and `acks` and 
`max.in.flight.requests.per.connection` need validation
+        if (idempotenceEnabled) {
             boolean userConfiguredRetries = 
originalConfigs.containsKey(RETRIES_CONFIG);
-            if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
-                throw new ConfigException("Must set " + 
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent 
producer.");
+            int retries = this.getInt(RETRIES_CONFIG);
+            if (userConfiguredRetries && retries == 0) {
+                if (userConfiguredIdempotence) {
+                    throw new ConfigException("Must set " + RETRIES_CONFIG + " 
to non-zero when using the idempotent producer.");
+                }
+                log.warn("`enable.idempotence` will be disabled because {} 
config is set to 0.", RETRIES_CONFIG, retries);

Review comment:
       Thinking about this some more, not sure there's a lot of value in 
forcing users to set `idempotence=false` in cases where they're setting 
`acks=1|0` or `retries=0`. So, I'd change the warning to info for these cases.
   
   `max.in.flight.requests.per.connection` is different since it's an 
implementation constraint that `idempotence` doesn't work with it vs inherent 
to the configuration. For this one, I'd have a warning and mention that it will 
become an error in Kafka 4.0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to