This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 5fde508 KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up
testings (#11002)
5fde508 is described below
commit 5fde508731e000f528b7995de5f37602639d84df
Author: Cheng Tan <[email protected]>
AuthorDate: Mon Jul 26 13:45:59 2021 -0700
KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002)
Reviewers: Rajini Sivaram <[email protected]>
---
.../kafka/clients/producer/ProducerConfig.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 83 ++++++++++++++++++++++
.../sanity_checks/test_verifiable_producer.py | 8 ++-
3 files changed, 90 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 970a70b..0492fbf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -286,7 +286,7 @@ public class ProducerConfig extends AbstractConfig {
Type.STRING,
"all",
in("all", "-1", "0", "1"),
- Importance.HIGH,
+ Importance.LOW,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING,
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384,
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index f055e12..2784f19 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -173,6 +173,89 @@ public class KafkaProducerTest {
}
@Test
+ public void testAcksAndIdempotenceForIdempotentProducers() {
+ Properties baseProps = new Properties() {{
+ setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ setProperty(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ setProperty(
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ }};
+
+ Properties validProps = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "0");
+ setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+ }};
+ ProducerConfig config = new ProducerConfig(validProps);
+ assertFalse(
+ config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be overwritten");
+ assertEquals(
+ "0",
+ config.getString(ProducerConfig.ACKS_CONFIG),
+ "acks should be overwritten");
+
+ Properties validProps2 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"transactionalId");
+ }};
+ config = new ProducerConfig(validProps2);
+ assertTrue(
+ config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be set with the default value");
+ assertEquals(
+ "-1",
+ config.getString(ProducerConfig.ACKS_CONFIG),
+ "acks should be set with the default value");
+
+ Properties validProps3 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "all");
+ setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+ }};
+ config = new ProducerConfig(validProps3);
+
assertFalse(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
+ "idempotence should be overwritten");
+ assertEquals(
+ "-1",
+ config.getString(ProducerConfig.ACKS_CONFIG),
+ "acks should be overwritten");
+
+ Properties invalidProps = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "0");
+ setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+ setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"transactionalId");
+ }};
+ assertThrows(
+ ConfigException.class,
+ () -> new ProducerConfig(invalidProps),
+ "Cannot set a transactional.id without also enabling idempotence");
+
+ Properties invalidProps2 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "1");
+ setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ }};
+ assertThrows(
+ ConfigException.class,
+ () -> new ProducerConfig(invalidProps2),
+ "Must set acks to all in order to use the idempotent producer");
+
+ Properties invalidProps3 = new Properties() {{
+ putAll(baseProps);
+ setProperty(ProducerConfig.ACKS_CONFIG, "0");
+ setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ }};
+ assertThrows(
+ ConfigException.class,
+ () -> new ProducerConfig(invalidProps3),
+ "Must set acks to all in order to use the idempotent producer");
+ }
+
+ @Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 55a928f..1aa2109 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -49,12 +49,14 @@ class TestVerifiableProducer(Test):
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(LATEST_0_10_0))
@parametrize(producer_version=str(LATEST_0_10_1))
+ @matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"],
enable_idempotence=[False])
+ @matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"],
enable_idempotence=[True])
@matrix(producer_version=[str(DEV_BRANCH)],
security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@cluster(num_nodes=4)
@matrix(producer_version=[str(DEV_BRANCH)],
security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
metadata_quorum=quorum.all)
- def test_simple_run(self, producer_version, security_protocol =
'PLAINTEXT', sasl_mechanism='PLAIN',
- metadata_quorum=quorum.zk):
+ def test_simple_run(self, producer_version, acks=None,
enable_idempotence=False, security_protocol = 'PLAINTEXT',
+ sasl_mechanism='PLAIN', metadata_quorum=quorum.zk):
"""
Test that we can start VerifiableProducer on the current branch
snapshot version or against the 0.8.2 jar, and
verify that we can produce a small number of messages.
@@ -72,6 +74,8 @@ class TestVerifiableProducer(Test):
self.kafka.start()
node = self.producer.nodes[0]
+ self.producer.enable_idempotence = enable_idempotence
+ self.producer.acks = acks
node.version = KafkaVersion(producer_version)
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,