This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c3e7c0b MINOR: Producers should set delivery timeout instead of
retries (#5425)
c3e7c0b is described below
commit c3e7c0bcb258061568294c0d96b62fea94ef8ee7
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Aug 1 11:04:17 2018 -0700
MINOR: Producers should set delivery timeout instead of retries (#5425)
Use delivery timeout instead of retries when possible and remove various
TODOs associated with completion of KIP-91.
Reviewers: Ismael Juma <[email protected]>, Guozhang Wang
<[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../kafka/clients/producer/ProducerConfig.java | 24 ++++++++++++++--------
.../org/apache/kafka/connect/runtime/Worker.java | 1 -
.../connect/storage/KafkaConfigBackingStore.java | 2 +-
.../connect/storage/KafkaOffsetBackingStore.java | 2 +-
core/src/main/scala/kafka/tools/MirrorMaker.scala | 10 ++++-----
.../kafka/api/AuthorizerIntegrationTest.scala | 2 --
.../kafka/api/BaseProducerSendTest.scala | 6 ++++--
.../kafka/api/EndToEndAuthorizationTest.scala | 10 ++++-----
.../kafka/api/IntegrationTestHarness.scala | 20 +++++++++---------
.../kafka/api/PlaintextConsumerTest.scala | 2 +-
.../kafka/api/PlaintextProducerSendTest.scala | 3 +--
.../kafka/api/ProducerFailureHandlingTest.scala | 6 +++---
.../kafka/api/RackAwareAutoTopicCreationTest.scala | 2 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 1 -
.../server/DynamicBrokerReconfigurationTest.scala | 17 +++++++++------
.../other/kafka/ReplicationQuotasTestRig.scala | 2 +-
.../FetchRequestDownConversionConfigTest.scala | 2 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 9 ++++----
.../unit/kafka/server/LogDirFailureTest.scala | 16 ++++++++++-----
.../scala/unit/kafka/server/LogRecoveryTest.scala | 1 -
.../scala/unit/kafka/server/ReplicaFetchTest.scala | 1 -
.../unit/kafka/server/ReplicationQuotasTest.scala | 8 ++++----
.../unit/kafka/server/ServerShutdownTest.scala | 1 -
...chDrivenReplicationProtocolAcceptanceTest.scala | 12 +++++------
.../server/epoch/LeaderEpochIntegrationTest.scala | 7 ++++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 23 ++++++++-------------
docs/upgrade.html | 4 ++++
.../kafka/log4jappender/KafkaLog4jAppender.java | 23 +++++++++++++++------
.../org/apache/kafka/streams/StreamsConfig.java | 11 +++++-----
.../apache/kafka/streams/StreamsConfigTest.java | 14 ++++++-------
.../integration/QueryableStateIntegrationTest.java | 1 -
.../apache/kafka/streams/perf/SimpleBenchmark.java | 3 ---
.../apache/kafka/streams/perf/YahooBenchmark.java | 2 --
.../streams/tests/BrokerCompatibilityTest.java | 2 --
.../apache/kafka/streams/tests/EosTestClient.java | 3 ---
.../kafka/streams/tests/SmokeTestClient.java | 4 ----
.../kafka/streams/tests/SmokeTestDriver.java | 5 +----
.../kafka/tools/TransactionalMessageCopier.java | 6 ------
39 files changed, 133 insertions(+), 137 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b40b09a..2a35b30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -96,7 +96,7 @@ import static
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
* props.put("acks", "all");
- * props.put("retries", 0);
+ * props.put("delivery.timeout.ms", 30000);
* props.put("batch.size", 16384);
* props.put("linger.ms", 1);
* props.put("buffer.memory", 33554432);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index ab55353..6b37e3c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -107,10 +107,14 @@ public class ProducerConfig extends AbstractConfig {
/** <code>delivery.timeout.ms</code> */
public static final String DELIVERY_TIMEOUT_MS_CONFIG =
"delivery.timeout.ms";
- private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on
the time to report success or failure after Producer.send() returns. "
- + "Producer may
report failure to send a message earlier than this config if all the retries
are exhausted or "
- + "a record is added
to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be
equal to or "
- + "greater than " +
REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG;
+ private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on
the time to report success or failure "
+ + "after a call to <code>send()</code> returns. This limits the
total time that a record will be delayed "
+ + "prior to sending, the time to await acknowledgement from the
broker (if expected), and the time allowed "
+ + "for retriable send failures. The producer may report failure to
send a record earlier than this config if "
+ + "either an unrecoverable error is encountered, the retries have
been exhausted, "
+ + "or the record is added to a batch which reached an earlier
delivery expiration deadline."
+ + "The value of this config should be greater than or equal to the
sum of " + REQUEST_TIMEOUT_MS_CONFIG
+ + " and " + LINGER_MS_CONFIG + ". ";
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG =
CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -181,10 +185,14 @@ public class ProducerConfig extends AbstractConfig {
/** <code>retries</code> */
public static final String RETRIES_CONFIG =
CommonClientConfigs.RETRIES_CONFIG;
private static final String RETRIES_DOC = "Setting a value greater than
zero will cause the client to resend any record whose send fails with a
potentially transient error."
- + " Note that this retry is no
different than if the client resent the record upon receiving the error."
- + " Allowing retries without
setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will
potentially change the"
- + " ordering of records because
if two batches are sent to a single partition, and the first fails and is
retried but the second"
- + " succeeds, then the records
in the second batch may appear first.";
+ + " Note that this retry is no different than if the client resent
the record upon receiving the error."
+ + " Allowing retries without setting <code>" +
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change
the"
+ + " ordering of records because if two batches are sent to a
single partition, and the first fails and is retried but the second"
+ + " succeeds, then the records in the second batch may appear
first. Note additionall that produce requests will be"
+ + " failed before the number of retries has been exhausted if the
timeout configured by"
+ + " " + DELIVERY_TIMEOUT_MS_CONFIG + " expires first before
successful acknowledgement. Users should generally"
+ + " prefer to leave this config unset and instead use " +
DELIVERY_TIMEOUT_MS_CONFIG + " to control"
+ + " retry behavior.";
/** <code>key.serializer</code> */
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1f62103..e2fe6b6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -135,7 +135,6 @@ public class Worker {
// These settings are designed to ensure there is no data loss. They
*may* be overridden via configs passed to the
// worker, but this may compromise the delivery guarantees of Kafka
Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
Integer.toString(Integer.MAX_VALUE));
- producerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,
Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index ea19665..e7ee632 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -415,7 +415,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+ producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index fb8ad97..195c498 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -71,7 +71,7 @@ public class KafkaOffsetBackingStore implements
OffsetBackingStore {
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+ producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 9cc6ebe..d7e09e4 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -32,7 +32,7 @@ import
org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, Consu
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.record.RecordBatch
@@ -50,7 +50,7 @@ import scala.util.control.ControlThrowable
* @note For mirror maker, the following settings are set by default to make
sure there is no data loss:
* 1. use producer with following settings
* acks=all
- * retries=max integer
+ * delivery.timeout.ms=max integer
* max.block.ms=max long
* max.in.flight.requests.per.connection=1
* 2. Consumer Settings
@@ -193,13 +193,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup
{
val sync = producerProps.getProperty("producer.type",
"async").equals("sync")
producerProps.remove("producer.type")
// Defaults to no data loss settings.
- maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG,
Int.MaxValue.toString)
+ maybeSetDefaultProperty(producerProps,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Int.MaxValue.toString)
maybeSetDefaultProperty(producerProps,
ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
maybeSetDefaultProperty(producerProps,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
// Always set producer key and value serializer to ByteArraySerializer.
- producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
- producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
+ producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[ByteArraySerializer].getName)
+ producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[ByteArraySerializer].getName)
producer = new MirrorMakerProducer(sync, producerProps)
// Create consumers
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c6b3af2..dc9ca85 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1442,7 +1442,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val transactionalProperties = new Properties()
transactionalProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionalId)
val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = 3,
props = Some(transactionalProperties))
producers += producer
producer
@@ -1452,7 +1451,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val idempotentProperties = new Properties()
idempotentProperties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
"true")
val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = 3,
props = Some(idempotentProperties))
producers += producer
producer
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 739675e..ad44425 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -68,9 +68,11 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
super.tearDown()
}
- protected def createProducer(brokerList: String, retries: Int = 0, lingerMs:
Int = 0, props: Option[Properties] = None):
KafkaProducer[Array[Byte],Array[Byte]] = {
+ protected def createProducer(brokerList: String,
+ lingerMs: Int = 0,
+ props: Option[Properties] = None):
KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createProducer(brokerList, securityProtocol =
securityProtocol, trustStoreFile = trustStoreFile,
- saslProperties = clientSaslProperties, retries = retries, lingerMs =
lingerMs, props = props)
+ saslProperties = clientSaslProperties, lingerMs = lingerMs, props =
props)
registerProducer(producer)
}
diff --git
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index a002309..7ea761f 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -189,11 +189,11 @@ abstract class EndToEndAuthorizationTest extends
IntegrationTestHarness with Sas
override def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
TestUtils.createProducer(brokerList,
- maxBlockMs = 3000L,
- securityProtocol = this.securityProtocol,
- trustStoreFile = this.trustStoreFile,
- saslProperties = this.clientSaslProperties,
- props = Some(producerConfig))
+ maxBlockMs = 3000L,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.clientSaslProperties,
+ props = Some(producerConfig))
}
/**
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 053f04e..4601417 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -93,19 +93,19 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
}
def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
- TestUtils.createProducer(brokerList,
- securityProtocol = this.securityProtocol,
- trustStoreFile = this.trustStoreFile,
- saslProperties = this.clientSaslProperties,
- props = Some(producerConfig))
+ TestUtils.createProducer(brokerList,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.clientSaslProperties,
+ props = Some(producerConfig))
}
def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
- TestUtils.createConsumer(brokerList,
- securityProtocol = this.securityProtocol,
- trustStoreFile = this.trustStoreFile,
- saslProperties = this.clientSaslProperties,
- props = Some(consumerConfig))
+ TestUtils.createConsumer(brokerList,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.clientSaslProperties,
+ props = Some(consumerConfig))
}
@After
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index ba4df7d..0c8c771 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -619,7 +619,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,
CompressionType.GZIP.name)
producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG,
Int.MaxValue.toString)
val producer = TestUtils.createProducer(brokerList, securityProtocol =
securityProtocol, trustStoreFile = trustStoreFile,
- saslProperties = clientSaslProperties, retries = 0, lingerMs =
Int.MaxValue, props = Some(producerProps))
+ saslProperties = clientSaslProperties, lingerMs = Int.MaxValue, props
= Some(producerProps))
(0 until numRecords).foreach { i =>
producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key
$i".getBytes, s"value $i".getBytes))
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 1da6f9e..8ae3952 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -70,8 +70,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
*/
@Test
def testAutoCreateTopic() {
- val producer = createProducer(brokerList, retries = 5)
-
+ val producer = createProducer(brokerList)
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes,
"value".getBytes)
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 0227690..b7d3ecb 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -64,11 +64,11 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
override def setUp() {
super.setUp()
- producer1 = TestUtils.createProducer(brokerList, acks = 0,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
+ producer1 = TestUtils.createProducer(brokerList, acks = 0, retries = 0,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
- producer2 = TestUtils.createProducer(brokerList, acks = 1,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
+ producer2 = TestUtils.createProducer(brokerList, acks = 1, retries = 0,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
- producer3 = TestUtils.createProducer(brokerList, acks = -1,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
+ producer3 = TestUtils.createProducer(brokerList, acks = -1, retries = 0,
requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
}
diff --git
a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index bfd2924..5fc626b 100644
---
a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -44,7 +44,7 @@ class RackAwareAutoTopicCreationTest extends
KafkaServerTestHarness with RackAwa
@Test
def testAutoCreateTopic() {
- val producer = TestUtils.createProducer(brokerList, retries = 5)
+ val producer = TestUtils.createProducer(brokerList)
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes,
"value".getBytes)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index b58ba74..ebc587e 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -246,7 +246,6 @@ class SaslClientsWithInvalidCredentialsTest extends
IntegrationTestHarness with
val txProducer = TestUtils.createProducer(brokerList,
securityProtocol = this.securityProtocol,
saslProperties = this.clientSaslProperties,
- retries = 1000,
acks = -1,
props = Some(producerConfig))
producers += txProducer
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 61d5919..5694cff 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -304,7 +304,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
@Test
def testLogCleanerConfig(): Unit = {
- val (producerThread, consumerThread) = startProduceConsume(0)
+ val (producerThread, consumerThread) = startProduceConsume(retries = 0)
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
@@ -437,7 +437,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
def testUncleanLeaderElectionEnable(): Unit = {
val topic = "testtopic2"
TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
- val producer = ProducerBuilder().maxRetries(1000).acks(1).build()
+ val producer = ProducerBuilder().acks(1).build()
val consumer =
ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
consumer.commitSync()
@@ -543,7 +543,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
def verifyThreadPoolResize(propName: String, currentSize: => Int,
threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = {
maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
val numRetries = if (mayReceiveDuplicates) 100 else 0
- val (producerThread, consumerThread) = startProduceConsume(numRetries)
+ val (producerThread, consumerThread) = startProduceConsume(retries =
numRetries)
var threadPoolSize = currentSize
(1 to 2).foreach { _ =>
threadPoolSize = reducePoolSize(propName, threadPoolSize, threadPrefix)
@@ -736,6 +736,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
val producer1 = ProducerBuilder().trustStoreProps(sslProperties1)
.maxRetries(0)
.requestTimeoutMs(1000)
+ .deliveryTimeoutMs(1000)
.bootstrapServers(bootstrap)
.build()
@@ -1366,18 +1367,21 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
private case class ProducerBuilder() extends
ClientBuilder[KafkaProducer[String, String]] {
- private var _retries = 0
+ private var _retries = Int.MaxValue
private var _acks = -1
private var _requestTimeoutMs = 30000
+ private var _deliveryTimeoutMs = 30000
def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this
}
def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = {
_requestTimeoutMs = timeoutMs; this }
+ def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = {
_deliveryTimeoutMs= timeoutMs; this }
override def build(): KafkaProducer[String, String] = {
val producer = TestUtils.createProducer(bootstrapServers,
acks = _acks,
requestTimeoutMs = _requestTimeoutMs,
+ deliveryTimeoutMs = _deliveryTimeoutMs,
retries = _retries,
securityProtocol = _securityProtocol,
trustStoreFile = Some(trustStoreFile1),
@@ -1417,8 +1421,9 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
}
- private class ProducerThread(clientId: String, retries: Int) extends
- ShutdownableThread(clientId, isInterruptible = false) {
+ private class ProducerThread(clientId: String, retries: Int)
+ extends ShutdownableThread(clientId, isInterruptible = false) {
+
private val producer =
ProducerBuilder().maxRetries(retries).clientId(clientId).build()
val lastSent = new ConcurrentHashMap[Int, Int]()
@volatile var sent = 0
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 98e568b..b2568c1 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -127,7 +127,7 @@ object ReplicationQuotasTestRig {
createTopic(zkClient, topicName, replicas, servers)
println("Writing Data")
- val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
retries = 5, acks = 0)
+ val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks =
0)
(0 until config.msgsPerPartition).foreach { x =>
(0 until config.partitions).foreach { partition =>
producer.send(new ProducerRecord(topicName, partition, null, new
Array[Byte](config.msgSize)))
diff --git
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
index 1e762b3..1bf6f28 100644
---
a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
+++
b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -52,7 +52,7 @@ class FetchRequestDownConversionConfigTest extends
BaseRequestTest {
private def initProducer(): Unit = {
producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = 5, keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
+ keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
}
private def createTopics(numTopics: Int, numPartitions: Int,
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 57aca1e..388b0f8 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -70,7 +70,7 @@ class FetchRequestTest extends BaseRequestTest {
private def initProducer(): Unit = {
producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = 5, keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
+ keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
}
@Test
@@ -204,8 +204,8 @@ class FetchRequestTest extends BaseRequestTest {
val propsOverride = new Properties
propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = 5, lingerMs = Int.MaxValue,
- keySerializer = new StringSerializer, valueSerializer = new
ByteArraySerializer, props = Some(propsOverride))
+ lingerMs = Int.MaxValue, keySerializer = new StringSerializer,
+ valueSerializer = new ByteArraySerializer, props = Some(propsOverride))
val bytes = new Array[Byte](msgValueLen)
val futures = try {
(0 to 1000).map { _ =>
@@ -263,7 +263,8 @@ class FetchRequestTest extends BaseRequestTest {
// Increase linger so that we have control over the batches created
producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
retries = 5, keySerializer = new StringSerializer, valueSerializer = new
StringSerializer,
- lingerMs = 300 * 1000)
+ lingerMs = 30 * 1000,
+ deliveryTimeoutMs = 60 * 1000)
val topicConfig = Map(LogConfig.MessageFormatVersionProp ->
KAFKA_0_11_0_IV2.version)
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1, topicConfig).head
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 2087363..9b32cd2 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -24,9 +24,8 @@ import kafka.server.LogDirFailureTest._
import kafka.api.IntegrationTestHarness
import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.{CoreUtils, Exit, TestUtils}
-
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.{KafkaStorageException,
NotLeaderForPartitionException}
@@ -47,8 +46,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
private val partitionNum = 12
this.logDirCount = 3
- this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
- this.producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
"100")
this.serverConfig.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp,
"60000")
this.serverConfig.setProperty(KafkaConfig.NumReplicaFetchersProp, "1")
@@ -58,6 +55,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
createTopic(topic, partitionNum, serverCount)
}
+ override def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+ TestUtils.createProducer(brokerList,
+ retries = 0,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.clientSaslProperties,
+ props = Some(producerConfig))
+ }
+
+
@Test
def testIOExceptionDuringLogRoll() {
testProduceAfterLogDirFailureOnLeader(Roll)
@@ -175,7 +182,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
case t: NotLeaderForPartitionException => // This may happen if
ProduceRequest version <= 3
case t: Throwable => fail(s"send() should fail with either
KafkaStorageException or NotLeaderForPartitionException instead of
${t.toString}")
}
- case e: Throwable => fail(s"send() should fail with either
KafkaStorageException or NotLeaderForPartitionException instead of
${e.toString}")
}
// Wait for producer to update metadata for the partition
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 1bd15f7..82b95a8 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -67,7 +67,6 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
producer.close()
producer = TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(servers),
- retries = 5,
keySerializer = new IntegerSerializer,
valueSerializer = new StringSerializer
)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index 0dd22f1..8eba824 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -56,7 +56,6 @@ class ReplicaFetchTest extends ZooKeeperTestHarness {
// send test messages to leader
val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
- retries = 5,
keySerializer = new
StringSerializer,
valueSerializer = new
StringSerializer)
val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m))
++
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 0bbe637..5125486 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -114,7 +114,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
adminZkClient.changeTopicConfig(topic,
propsWith(FollowerReplicationThrottledReplicasProp,
"0:106,1:106,2:106,3:107,4:107,5:107"))
//Add data equally to each partition
- producer = createProducer(getBrokerListStrFromServers(brokers), retries =
5, acks = 1)
+ producer = createProducer(getBrokerListStrFromServers(brokers), acks = 1)
(0 until msgCount).foreach { _ =>
(0 to 7).foreach { partition =>
producer.send(new ProducerRecord(topic, partition, null, msg))
@@ -203,14 +203,14 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val throttledTook = System.currentTimeMillis() - start
- assertTrue((s"Throttled replication of ${throttledTook}ms should be >
${expectedDuration * 1000 * 0.9}ms"),
+ assertTrue(s"Throttled replication of ${throttledTook}ms should be >
${expectedDuration * 1000 * 0.9}ms",
throttledTook > expectedDuration * 1000 * 0.9)
- assertTrue((s"Throttled replication of ${throttledTook}ms should be <
${expectedDuration * 1500}ms"),
+ assertTrue(s"Throttled replication of ${throttledTook}ms should be <
${expectedDuration * 1500}ms",
throttledTook < expectedDuration * 1000 * 1.5)
}
def addData(msgCount: Int, msg: Array[Byte]): Unit = {
- producer = createProducer(getBrokerListStrFromServers(brokers), retries =
5, acks = 0)
+ producer = createProducer(getBrokerListStrFromServers(brokers), acks = 0)
(0 until msgCount).map(_ => producer.send(new ProducerRecord(topic,
msg))).foreach(_.get)
waitForOffsetsToMatch(msgCount, 0, 100)
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 9f966b4..ff09749 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -54,7 +54,6 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
def createProducer(server: KafkaServer): KafkaProducer[Integer, String] =
TestUtils.createProducer(
TestUtils.getBrokerListStrFromServers(Seq(server)),
- retries = 5,
keySerializer = new IntegerSerializer,
valueSerializer = new StringSerializer
)
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 3dcf4ff..149f05f 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -305,7 +305,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
TestUtils.createTopic(zkClient, topic, Map(0 -> Seq(100, 101)), brokers,
CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1")))
- producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
retries = 5, acks = 1)
+ producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
acks = 1)
// Write one message while both brokers are up
(0 until 1).foreach { i =>
@@ -328,7 +328,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
//Bounce the producer (this is required, probably because the broker port
changes on restart?)
producer.close()
- producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
retries = 5, acks = 1)
+ producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
acks = 1)
//Write 3 messages
(0 until 3).foreach { i =>
@@ -340,7 +340,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
//Bounce the producer (this is required, probably because the broker port
changes on restart?)
producer.close()
- producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
retries = 5, acks = 1)
+ producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
acks = 1)
//Write 1 message
(0 until 1).foreach { i =>
@@ -352,7 +352,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
//Bounce the producer (this is required, probably because the broker port
changes on restart?)
producer.close()
- producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
retries = 5, acks = 1)
+ producer = TestUtils.createProducer(getBrokerListStrFromServers(brokers),
acks = 1)
//Write 2 messages
(0 until 2).foreach { i =>
@@ -411,7 +411,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
}
private def createBufferingProducer: KafkaProducer[Array[Byte], Array[Byte]]
= {
- TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries =
5, acks = -1, lingerMs = 10000,
+ TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1,
lingerMs = 10000,
props = Option(CoreUtils.propsWith(
(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(msg.length * 1000))
, (ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
@@ -451,7 +451,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends
ZooKeeperTestHarness
}
private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
- TestUtils.createProducer(getBrokerListStrFromServers(brokers), retries =
5, acks = -1)
+ TestUtils.createProducer(getBrokerListStrFromServers(brokers), acks = -1)
}
private def leader(): KafkaServer = {
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 4b5a092..a6b7732 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -104,7 +104,7 @@ class LeaderEpochIntegrationTest extends
ZooKeeperTestHarness with Logging {
TestUtils.createTopic(zkClient, topic2, assignment2, brokers)
//Send messages equally to the two partitions, then half as many to a third
- producer = createProducer(getBrokerListStrFromServers(brokers), retries =
5, acks = -1)
+ producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
(0 until 10).foreach { _ =>
producer.send(new ProducerRecord(topic1, 0, null, "IHeartLogs".getBytes))
}
@@ -144,7 +144,7 @@ class LeaderEpochIntegrationTest extends
ZooKeeperTestHarness with Logging {
brokers = Seq(100, 101).map { id =>
createServer(fromProps(createBrokerConfig(id, zkConnect))) }
def leo() =
brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
TestUtils.createTopic(zkClient, tp.topic, Map(tp.partition -> Seq(101)),
brokers)
- producer = createProducer(getBrokerListStrFromServers(brokers), retries =
10, acks = -1)
+ producer = createProducer(getBrokerListStrFromServers(brokers), acks = -1)
//1. Given a single message
producer.send(new ProducerRecord(tp.topic, tp.partition, null,
"IHeartLogs".getBytes)).get
@@ -251,7 +251,8 @@ class LeaderEpochIntegrationTest extends
ZooKeeperTestHarness with Logging {
private def sendFourMessagesToEachTopic() = {
val testMessageList1 = List("test1", "test2", "test3", "test4")
val testMessageList2 = List("test5", "test6", "test7", "test8")
- val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
retries = 5, keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
+ val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(brokers),
+ keySerializer = new StringSerializer, valueSerializer = new
StringSerializer)
val records =
testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++
testMessageList2.map(m => new ProducerRecord(topic2, m, m))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index aa902f2..9783627 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -547,9 +547,10 @@ object TestUtils extends Logging {
acks: Int = -1,
maxBlockMs: Long = 60 * 1000L,
bufferSize: Long = 1024L * 1024L,
- retries: Int = 0,
+ retries: Int = Int.MaxValue,
+ deliveryTimeoutMs: Int = 30 * 1000,
lingerMs: Int = 0,
- requestTimeoutMs: Int = 30 * 1000,
+ requestTimeoutMs: Int = 20 * 1000,
securityProtocol: SecurityProtocol =
SecurityProtocol.PLAINTEXT,
trustStoreFile: Option[File] = None,
saslProperties: Option[Properties] = None,
@@ -563,13 +564,10 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
+ producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
deliveryTimeoutMs.toString)
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeoutMs.toString)
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
- // In case of overflow set maximum possible value for deliveryTimeoutMs
- val deliveryTimeoutMs = if (lingerMs + requestTimeoutMs < 0) Int.MaxValue
else lingerMs + requestTimeoutMs
- producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
deliveryTimeoutMs.toString)
-
/* Only use these if not already set */
val defaultProps = Map(
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
@@ -993,7 +991,7 @@ object TestUtils extends Logging {
compressionType: CompressionType =
CompressionType.NONE): Unit = {
val props = new Properties()
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType.name)
- val producer =
createProducer(TestUtils.getBrokerListStrFromServers(servers), retries = 5,
acks = acks)
+ val producer =
createProducer(TestUtils.getBrokerListStrFromServers(servers), acks = acks)
try {
val futures = records.map(producer.send)
futures.foreach(_.get)
@@ -1017,10 +1015,7 @@ object TestUtils extends Logging {
}
def produceMessage(servers: Seq[KafkaServer], topic: String, message:
String) {
- val producer = createProducer(
- TestUtils.getBrokerListStrFromServers(servers),
- retries = 5
- )
+ val producer =
createProducer(TestUtils.getBrokerListStrFromServers(servers))
producer.send(new ProducerRecord(topic, topic.getBytes,
message.getBytes)).get
producer.close()
}
@@ -1273,19 +1268,17 @@ object TestUtils extends Logging {
transactionTimeoutMs: Long = 60000) = {
val props = new Properties()
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
transactionTimeoutMs.toString)
- TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+ TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
acks = -1, props = Some(props))
}
// Seeds the given topic with records with keys and values in the range
[0..numRecords)
def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers:
Seq[KafkaServer]): Unit = {
val props = new Properties()
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
- val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
- retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+ val producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), acks =
-1, props = Some(props))
try {
for (i <- 0 until numRecords) {
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
asBytes(i.toString), asBytes(i.toString)))
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ac1388e..264e26d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -35,6 +35,10 @@
which sets an upper bound on the total time between sending a record
and receiving acknowledgement from the broker. By default,
the delivery timeout is set to 2 minutes.
</li>
+ <li>By default, MirrorMaker now overrides <code>delivery.timeout.ms</code>
to <code>Integer.MAX_VALUE</code> when
+ configuring the producer. If you have overridden the value of
<code>retries</code> in order to fail faster,
+ you will instead need to override <code>delivery.timeout.ms</code>.
+ </li>
</ol>
diff --git
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
index 6a09cab..6ddaf92 100644
---
a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
+++
b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Future;
import static
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@@ -64,8 +65,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private String clientJaasConfPath;
private String kerb5ConfPath;
- private int retries;
- private int requiredNumAcks = Integer.MAX_VALUE;
+ private int retries = Integer.MAX_VALUE;
+ private int requiredNumAcks = 1;
+ private int deliveryTimeoutMs = 120000;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
@@ -97,6 +99,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.retries = retries;
}
+ public int getDeliveryTimeoutMs() {
+ return deliveryTimeoutMs;
+ }
+
+ public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
+ this.deliveryTimeoutMs = deliveryTimeoutMs;
+ }
+
public String getCompressionType() {
return compressionType;
}
@@ -205,10 +215,11 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
throw new ConfigException("Topic must be specified by the Kafka
log4j appender");
if (compressionType != null)
props.put(COMPRESSION_TYPE_CONFIG, compressionType);
- if (requiredNumAcks != Integer.MAX_VALUE)
- props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
- if (retries > 0)
- props.put(RETRIES_CONFIG, retries);
+
+ props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
+ props.put(RETRIES_CONFIG, retries);
+ props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
+
if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 54fcbc0..e9fe3c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -98,10 +98,11 @@ import static
org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
* StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
* }</pre>
*
- * When increasing both {@link ProducerConfig#RETRIES_CONFIG} and {@link
ProducerConfig#MAX_BLOCK_MS_CONFIG} to be more resilient to non-available
brokers you should also
- * consider increasing {@link ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG}
using the following guidance:
+ *
+ * When increasing {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} to be more
resilient to non-available brokers you should also
+ * increase {@link ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG} using the
following guidance:
* <pre>
- * max.poll.interval.ms > min ( max.block.ms, (retries +1) *
request.timeout.ms )
+ * max.poll.interval.ms > max.block.ms
* </pre>
*
*
@@ -687,15 +688,13 @@ public class StreamsConfig extends AbstractConfig {
static {
final Map<String, Object> tempProducerDefaultOverrides = new
HashMap<>();
tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG,
"100");
- tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 10);
-
PRODUCER_DEFAULT_OVERRIDES =
Collections.unmodifiableMap(tempProducerDefaultOverrides);
}
private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
static {
final Map<String, Object> tempProducerDefaultOverrides = new
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
- tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG,
Integer.MAX_VALUE);
+
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
Integer.MAX_VALUE);
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
PRODUCER_EOS_OVERRIDES =
Collections.unmodifiableMap(tempProducerDefaultOverrides);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index cdd4d09..9755334 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -456,9 +456,7 @@ public class StreamsConfigTest {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
- String isoLevel = (String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
- String name = READ_COMMITTED.name();
- assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
+ assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
}
@Test
@@ -466,7 +464,7 @@ public class StreamsConfigTest {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
- assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
+ assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
}
@@ -484,7 +482,7 @@ public class StreamsConfigTest {
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((Boolean)
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
+
assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
equalTo(false));
}
@Test
@@ -495,9 +493,9 @@ public class StreamsConfigTest {
final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
+ assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertTrue((Boolean)
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
- assertThat((Integer)
producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
+
assertThat(producerConfigs.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
equalTo(Integer.MAX_VALUE));
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG),
equalTo(100L));
}
@@ -510,7 +508,7 @@ public class StreamsConfigTest {
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((Integer)
producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
+ assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG),
equalTo(numberOfRetries));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index ff791be..496ba58 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -1065,7 +1065,6 @@ public class QueryableStateIntegrationTest {
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
- producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 654fd03..9301e5f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -262,9 +262,6 @@ public class SimpleBenchmark {
// improve producer throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
-
- //TODO remove this config or set to smaller value when KIP-91 is merged
-
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
}
private Properties setProduceConsumeProperties(final String clientId) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index ae53870..2104221 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -177,8 +177,6 @@ public class YahooBenchmark {
final CountDownLatch latch = new CountDownLatch(1);
parent.setStreamProperties("simple-benchmark-yahoo" + new
Random().nextInt());
- //TODO remove this config or set to smaller value when KIP-91 is merged
-
parent.props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
final KafkaStreams streams = createYahooBenchmarkStreams(parent.props,
campaignsTopic, eventsTopic, latch, parent.numRecords);
parent.runGenericBenchmark(streams, "Streams Yahoo Performance
[records/latency/rec-sec/MB-sec counted]: ", latch);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 3c8446c..767c9f1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -80,8 +80,6 @@ public class BrokerCompatibilityTest {
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
timeout);
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
timeout);
streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout
+ 1);
- //TODO remove this config or set to smaller value when KIP-91 is merged
-
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
Serde<String> stringSerde = Serdes.String();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 2d39d53..6069298 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.tests;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
@@ -119,8 +118,6 @@ public class EosTestClient extends SmokeTestUtil {
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
- //TODO remove this config or set to smaller value when KIP-91 is merged
-
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
60000);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Integer> data = builder.stream("data");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 268dd2f..79dfb30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -105,12 +105,8 @@ public class SmokeTestClient extends SmokeTestUtil {
fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- fullProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
- //TODO remove this config or set to smaller value when KIP-91 is merged
-
fullProps.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG),
80000);
-
fullProps.putAll(props);
return fullProps;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 7533fdd..a504333 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -153,10 +153,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- // the next 2 config values make sure that all records are produced
with no loss and no duplicates
- producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
- producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 80000);
final KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps);
@@ -166,7 +163,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
- allData.put(data[i].key, new HashSet<Integer>());
+ allData.put(data[i].key, new HashSet<>());
}
final Random rand = new Random();
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 87d27e8..0d74645 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -155,12 +155,6 @@ public class TransactionalMessageCopier {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
- // Multiple inflights means that when there are rolling bounces and
other cluster instability, there is an
- // increased likelihood of having previously tried batch expire in the
accumulator. This is a fatal error
- // for a transaction, causing the copier to exit. To work around this,
we bump the request timeout.
- // We can get rid of this when KIP-91 is merged.
- props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
-
return new KafkaProducer<>(props);
}