[
https://issues.apache.org/jira/browse/KAFKA-4515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15734702#comment-15734702
]
huxi commented on KAFKA-4515:
-----------------------------
Did you see lots of "Got error produce response with correlation id {} on
topic-partition {}, retrying ({} attempts left). Error" items in the producer
log when enabling retry?
> Async producer send not retrying on TimeoutException: Batch Expired
> -------------------------------------------------------------------
>
> Key: KAFKA-4515
> URL: https://issues.apache.org/jira/browse/KAFKA-4515
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.9.0.1
> Reporter: Di Shang
>
> We are testing out broker failure resiliency, we have a cluster of 3 brokers,
> a topic with 5 partitions and 2 replicas. The replicas are evenly distributed
> and there is at least a partition leader in every broker. We use this code to
> continuously send msg and then kill one of the brokers to see if we lost any
> msg.
> {code:title=MyTest.java|borderStyle=solid}
> static volatile KafkaProducer<Void, String> producer;
> public static void send(ProducerRecord<Void, String> record) {
> producer.send(record, (metadata, exception) -> {
> if (exception != null) {
> // handle exception with manual retry
> System.out.println("Error, resending...");
> exception.printStackTrace();
> try {
> Thread.sleep(100);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> //send(record); // without this retry, msg would be lost
> } else if (metadata != null) {
> System.out.println("Sent " + record);
> } else {
> System.out.println("No exception and no metadata");
> }
> });
> }
> public static void main(String[] args) throws Exception {
> Properties props = new Properties();
> props.put("bootstrap.servers", "...");
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("retries", "100000");
> props.put("acks", "1");
> props.put("request.timeout.ms", "1000");
> producer = new KafkaProducer<>(props);
> Long i = 1L;
> while (true) {
> ProducerRecord<Void, String> record =
> new ProducerRecord<>("my-topic", i.toString());
> send(record);
> Thread.sleep(100);
> i++;
> }
> }
> {code}
> What we found is that when we set *request.timeout.ms* to a small value like
> 1000, then when we kill a broker we would get a few TimeoutException: Batch
> Expired errors in the send() callback. And if we don't handle this by
> explicit retry like in the above code, then we would lose those msg.
> The documentation for *request.timeout.ms* says:
> bq. The configuration controls the maximum amount of time the client will
> wait for the response of a request. If the response is not received before
> the timeout elapses the client will resend the request if necessary or fail
> the request if retries are exhausted.
> This makes me think that a TimeoutException should be implicitly retried
> using the *retries* options, which doesn't seem to work.
> Strangely we also noticed that if *request.timeout.ms* is set long enough
> like the default 30000, then we don't lose any msg when killing a broker even
> if we set *retries* to 0.
> So it seems to me that the *retries* option is not working regarding to
> broker down scenario. There seems to be some other internal mechanism for
> handling broker failure and msg retry, and this mechanism won't work if there
> is TimeoutException.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)