This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 9f3c912  KAFKA-13759: Disable idempotence by default in producers 
instantiated by Connect (#11933)
9f3c912 is described below

commit 9f3c9120396d99cc21a8862bedf6db2caf2cc885
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Wed Mar 23 15:03:52 2022 -0700

    KAFKA-13759: Disable idempotence by default in producers instantiated by 
Connect (#11933)
    
    With AK 3.0, idempotence was enabled by default in Kafka producers. 
However, if idempotence is enabled, Connect won't be able to communicate via 
its producers with Kafka brokers older than version 0.11. Perhaps more 
importantly, for brokers older than version 2.8 the IDEMPOTENT_WRITE ACL is 
required to be granted to the principal of the Connect worker.
    
    Therefore this commit disables producer idempotence by default to all the 
producers instantiated by Connect. Users can still choose to enable producer 
idempotence by explicitly setting the right worker and/or connector properties.
    
    The changes were tested via existing unit, integration and system tests.
    
    Reviewers: Randall Hauch <rha...@gmail.com>
---
 .../src/main/java/org/apache/kafka/connect/runtime/Worker.java |  6 ++++++
 .../apache/kafka/connect/storage/KafkaConfigBackingStore.java  |  6 ++++++
 .../apache/kafka/connect/storage/KafkaOffsetBackingStore.java  |  6 ++++++
 .../apache/kafka/connect/storage/KafkaStatusBackingStore.java  |  7 ++++++-
 .../test/java/org/apache/kafka/connect/runtime/WorkerTest.java |  6 ++++++
 docs/upgrade.html                                              | 10 ++++++++++
 6 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 582271a..4adf6ff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -648,6 +648,12 @@ public class Worker {
         // These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
         // but this may compromise the delivery guarantees of Kafka Connect.
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.toString(Long.MAX_VALUE));
+        // By default, Connect disables idempotent behavior for all producers, 
even though idempotence became
+        // default for Kafka producers. This is to ensure Connect continues to 
work with many Kafka broker versions, including older brokers that do not 
support
+        // idempotent producers or require explicit steps to enable them (e.g. 
adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
+        // These settings might change when 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
+        // gets approved and scheduled for release.
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 669c72b..94b98cb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -511,6 +511,12 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        // By default, Connect disables idempotent behavior for all producers, 
even though idempotence became
+        // default for Kafka producers. This is to ensure Connect continues to 
work with many Kafka broker versions, including older brokers that do not 
support
+        // idempotent producers or require explicit steps to enable them (e.g. 
adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
+        // These settings might change when 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
+        // gets approved and scheduled for release.
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
         ConnectUtils.addMetricsContextProperties(producerProps, config, 
clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 313baf7..f3cbb68 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -91,6 +91,12 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        // By default, Connect disables idempotent behavior for all producers, 
even though idempotence became
+        // default for Kafka producers. This is to ensure Connect continues to 
work with many Kafka broker versions, including older brokers that do not 
support
+        // idempotent producers or require explicit steps to enable them (e.g. 
adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
+        // These settings might change when 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
+        // gets approved and scheduled for release.
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
         ConnectUtils.addMetricsContextProperties(producerProps, config, 
clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index c2aeba8..3ba6996 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -170,7 +170,12 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle 
retries in this class
-        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // 
disable idempotence since retries is force to 0
+        // By default, Connect disables idempotent behavior for all producers, 
even though idempotence became
+        // default for Kafka producers. This is to ensure Connect continues to 
work with many Kafka broker versions, including older brokers that do not 
support
+        // idempotent producers or require explicit steps to enable them (e.g. 
adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
+        // These settings might change when 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
+        // gets approved and scheduled for release.
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); 
// disable idempotence since retries is force to 0
         ConnectUtils.addMetricsContextProperties(producerProps, config, 
clusterId);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2b21079..dcd9286 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -213,6 +213,12 @@ public class WorkerTest extends ThreadedTest {
         defaultProducerConfigs.put(
             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
         defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.toString(Long.MAX_VALUE));
+        // By default, producers that are instantiated and used by Connect 
have idempotency disabled even after idempotency became
+        // default for Kafka producers. This is chosen to avoid breaking 
changes when Connect contacts Kafka brokers that do not support
+        // idempotent producers or require explicit steps to enable them (e.g. 
adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
+        // These settings might change when 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
+        // gets approved and scheduled for release.
+        defaultProducerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"false");
         defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
         
defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 "1");
         defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ddcee32..e770027 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -25,6 +25,11 @@
             which meant that idempotence remained disabled unless the user had 
explicitly set <code>enable.idempotence</code> to true
             (See <a 
href="https://issues.apache.org/jira/browse/KAFKA-13598";>KAFKA-13598</a>for 
more details).
             This issue was fixed and the default is properly applied in 3.0.1, 
3.1.1, and 3.2.0.</li>
+        <li>A notable exception is Connect that by default disables idempotent 
behavior for all of its
+            producers in order to uniformly support using a wide range of 
Kafka broker versions. 
+            Users can change this behavior to enable idempotence for some or 
all producers
+            via Connect worker and/or connector configuration. Connect may 
enable idempotent producers
+            by default in a future major release.</li>
     </ul>
 
 <h4><a id="upgrade_3_1_0" href="#upgrade_3_1_0">Upgrading to 3.1.0 from any 
version 0.8.x through 3.0.x</a></h4>
@@ -75,6 +80,11 @@
         A bug prevented the producer idempotence default from being applied 
which meant that it remained disabled unless the user had explicitly set
        <code>enable.idempotence</code> to true. See <a 
href="https://issues.apache.org/jira/browse/KAFKA-13598";>KAFKA-13598</a>for 
more details.
         This issue was fixed and the default is properly applied.</li>
+    <li>A notable exception is Connect that by default disables idempotent 
behavior for all of its
+        producers in order to uniformly support using a wide range of Kafka 
broker versions.
+        Users can change this behavior to enable idempotence for some or all 
producers
+        via Connect worker and/or connector configuration. Connect may enable 
idempotent producers
+        by default in a future major release.</li>
 </ul>
 
 <h5><a id="upgrade_310_notable" href="#upgrade_310_notable">Notable changes in 
3.1.0</a></h5>

Reply via email to