mimaison commented on a change in pull request #11788:
URL: https://github.com/apache/kafka/pull/11788#discussion_r811310081



##########
File path: docs/upgrade.html
##########
@@ -73,7 +74,8 @@ <h5><a id="upgrade_311_notable" 
href="#upgrade_311_notable">Notable changes in 3
 <ul>
     <li>A bug prevented the producer idempotence default from being applied 
which meant that it remained disabled unless the user had explicitly set
        <code>enable.idempotence</code> to true. See <a 
href="https://issues.apache.org/jira/browse/KAFKA-13598";>KAFKA-13598</a>for 
more details.
-        This issue was fixed and the default is properly applied.</li>
+        This issue was fixed and the default is properly applied.
+        However, if <code>enable.idempotence</code> is unset and the other 
configs conflicts with it, idempotence will be disabled to avoid break existing 
producers.</li>

Review comment:
       `break` -> `breaking`
   Same below

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##########
@@ -461,27 +464,56 @@ private void 
postProcessAndValidateIdempotenceConfigs(final Map<String, Object>
         final Map<String, Object> originalConfigs = this.originals();
         final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
         configs.put(ACKS_CONFIG, acksStr);
+        final boolean userConfiguredIdempotence = 
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean idempotenceEnabled = 
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+        boolean shouldDisableIdempotence = false;
 
-        // For idempotence producers, values for `RETRIES_CONFIG` and 
`ACKS_CONFIG` need validation
-        if (idempotenceEnabled()) {
+        // For idempotence producers, values for `retries` and `acks` and 
`max.in.flight.requests.per.connection` need validation
+        if (idempotenceEnabled) {
             boolean userConfiguredRetries = 
originalConfigs.containsKey(RETRIES_CONFIG);
-            if (userConfiguredRetries && this.getInt(RETRIES_CONFIG) == 0) {
-                throw new ConfigException("Must set " + 
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent 
producer.");
+            int retries = this.getInt(RETRIES_CONFIG);
+            if (userConfiguredRetries && retries == 0) {
+                if (userConfiguredIdempotence) {
+                    throw new ConfigException("Must set " + RETRIES_CONFIG + " 
to non-zero when using the idempotent producer.");
+                }
+                log.info("`enable.idempotence` will be disabled because {} 
config is set to 0.", RETRIES_CONFIG, retries);

Review comment:
       I think it would read better if we remove `config` from the sentence.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to