Di Shang created KAFKA-4515:
-------------------------------
Summary: Async producer send not retrying on TimeoutException:
Batch Expired
Key: KAFKA-4515
URL: https://issues.apache.org/jira/browse/KAFKA-4515
Project: Kafka
Issue Type: Bug
Components: producer
Affects Versions: 0.9.0.1
Reporter: Di Shang
We are testing out broker failure resiliency, we have a cluster of 3 brokers, a
topic with 5 partitions and 2 replicas. We use this code to continuously send
msg and then kill one of the brokers to see if we lost any msg.
{code:title=MyTest.java|borderStyle=solid}
static volatile KafkaProducer<Void, String> producer;
public static void send(ProducerRecord<Void, String> record) {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// handle exception with manual retry
System.out.println("Error, resending...");
exception.printStackTrace();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//send(record); // without this retry, msg would be lost
} else if (metadata != null) {
System.out.println("Sent " + record);
} else {
System.out.println("No exception and no metadata");
}
});
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "...");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("retries", "100000");
props.put("acks", "1");
props.put("request.timeout.ms", "1000");
producer = new KafkaProducer<>(props);
Long i = 1L;
while (true) {
ProducerRecord<Void, String> record =
new ProducerRecord<>("my-topic", i.toString());
send(record);
Thread.sleep(100);
i++;
}
}
{code}
What we found is that when we set *request.timeout.ms* to a small value like
1000, then when we kill a broker we would get a few TimeoutException: Batch
Expired errors in the send() callback. And if we don't handle this by explicit
retry like in the above code, then we would lose those msg.
The documentation for *request.timeout.ms* says:
bq. The configuration controls the maximum amount of time the client will wait
for the response of a request. If the response is not received before the
timeout elapses the client will resend the request if necessary or fail the
request if retries are exhausted.
This makes me think that a TimeoutException should be implicitly retried using
the *retries* options, which doesn't seem to work.
Strangely we also noticed that if *request.timeout.ms* is set long enough like
the default 30000, then we don't lose any msg when killing a broker even if we
set *retries* to 0.
So it seems to me that the *retries* option is not working regarding to broker
down scenario. There seems to be some other internal mechanism for handling
broker failure and msg retry, and this mechanism won't work if there is
TimeoutException.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)