This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 5fde508  KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up 
testings (#11002)
5fde508 is described below

commit 5fde508731e000f528b7995de5f37602639d84df
Author: Cheng Tan <[email protected]>
AuthorDate: Mon Jul 26 13:45:59 2021 -0700

    KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002)
    
    Reviewers: Rajini Sivaram <[email protected]>
---
 .../kafka/clients/producer/ProducerConfig.java     |  2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 83 ++++++++++++++++++++++
 .../sanity_checks/test_verifiable_producer.py      |  8 ++-
 3 files changed, 90 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 970a70b..0492fbf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -286,7 +286,7 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.STRING,
                                         "all",
                                         in("all", "-1", "0", "1"),
-                                        Importance.HIGH,
+                                        Importance.LOW,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index f055e12..2784f19 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -173,6 +173,89 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void testAcksAndIdempotenceForIdempotentProducers() {
+        Properties baseProps = new Properties() {{
+                setProperty(
+                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+                setProperty(
+                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+                setProperty(
+                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+            }};
+
+        Properties validProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+            }};
+        ProducerConfig config = new ProducerConfig(validProps);
+        assertFalse(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            "0",
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "acks should be overwritten");
+
+        Properties validProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transactionalId");
+            }};
+        config = new ProducerConfig(validProps2);
+        assertTrue(
+            config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be set with the default value");
+        assertEquals(
+            "-1",
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "acks should be set with the default value");
+
+        Properties validProps3 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "all");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+            }};
+        config = new ProducerConfig(validProps3);
+        
assertFalse(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+            "idempotence should be overwritten");
+        assertEquals(
+            "-1",
+            config.getString(ProducerConfig.ACKS_CONFIG),
+            "acks should be overwritten");
+
+        Properties invalidProps = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+                setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"transactionalId");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps),
+            "Cannot set a transactional.id without also enabling idempotence");
+
+        Properties invalidProps2 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "1");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps2),
+            "Must set acks to all in order to use the idempotent producer");
+
+        Properties invalidProps3 = new Properties() {{
+                putAll(baseProps);
+                setProperty(ProducerConfig.ACKS_CONFIG, "0");
+                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+            }};
+        assertThrows(
+            ConfigException.class,
+            () -> new ProducerConfig(invalidProps3),
+            "Must set acks to all in order to use the idempotent producer");
+    }
+
+    @Test
     public void testMetricsReporterAutoGeneratedClientId() {
         Properties props = new Properties();
         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py 
b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 55a928f..1aa2109 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -49,12 +49,14 @@ class TestVerifiableProducer(Test):
     @parametrize(producer_version=str(LATEST_0_9))
     @parametrize(producer_version=str(LATEST_0_10_0))
     @parametrize(producer_version=str(LATEST_0_10_1))
+    @matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], 
enable_idempotence=[False])
+    @matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], 
enable_idempotence=[True])
     @matrix(producer_version=[str(DEV_BRANCH)], 
security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
     @cluster(num_nodes=4)
     @matrix(producer_version=[str(DEV_BRANCH)], 
security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
             metadata_quorum=quorum.all)
-    def test_simple_run(self, producer_version, security_protocol = 
'PLAINTEXT', sasl_mechanism='PLAIN',
-                        metadata_quorum=quorum.zk):
+    def test_simple_run(self, producer_version, acks=None, 
enable_idempotence=False, security_protocol = 'PLAINTEXT',
+                        sasl_mechanism='PLAIN', metadata_quorum=quorum.zk):
         """
         Test that we can start VerifiableProducer on the current branch 
snapshot version or against the 0.8.2 jar, and
         verify that we can produce a small number of messages.
@@ -72,6 +74,8 @@ class TestVerifiableProducer(Test):
         self.kafka.start()
 
         node = self.producer.nodes[0]
+        self.producer.enable_idempotence = enable_idempotence
+        self.producer.acks = acks
         node.version = KafkaVersion(producer_version)
         self.producer.start()
         wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,

Reply via email to