Increasing reconnect.backoff.ms=1000 ms and BLOCK_ON_BUFFER_FULL_CONFIG to
true did not help either. The messages are simply lost.

Upset to find that there is no way to handle messages that are lost when
broker itself is not available and retries are not part of broker
connection issues.
https://issues.apache.org/jira/browse/KAFKA-156

The slide 24 of
http://www.slideshare.net/jhols1/apache-kafka-reliability-guarantees-stratahadoop-nyc-2015
also shows that the retries happen only if drain response fails but does
this include no or null response?

Let me try modifying some of these classes.




On Mon, Jun 13, 2016 at 8:32 PM, R Krishna <krishna...@gmail.com> wrote:

> As part of testing v0.9 Kafka at least once guarantees, we tried
> disconnecting Producer network and found that retries=10000000 are not
> happening. We get a
>
> WARN  kafka-producer-network-thread | producer-1
> [.kafka.clients.producer.internals.Sender]  - Got error produce response
> with correlation id 6474 on topic-partition test-topic-3-100-38, retrying
> (9999999 attempts left). Error: NETWORK_EXCEPTION
>
> And
>
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>
> When we tried debugging by putting a breakpoint in Accumulator and
> BatchRecord classes to stop when batch.attempts > 1 and it never stops
> beyond a value of 1 where the batch is reenqueued and although canRetry()
> always returns true. Is there a better way to debug this?
> clients.producer.internals.Sender.completeBatch(RecordBatch, Errors, long,
> long, long)
>
> The producer decides to skip messages when there is a network issue and
> was also verified by checking topic message counts.
>
> Also, the only option in an Async send is a callback on completion where
> even the recordmetadata is empty as expected because there was no server
> communication but how do we get the record itself after all the retries
> have happened so that nothing is lost?
> reconnect.backoff.ms = 100
>
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 30000
> connections.max.idle.ms = 540000
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> request.timeout.ms = 5000
> acks = 1
> batch.size = 16384
> receive.buffer.bytes = 32768
> retries = 10000000 <<<<<<<<<<<<<<<<
> max.request.size = 1048576
> metrics.sample.window.ms = 30000
> send.buffer.bytes = 131072
> linger.ms = 10
>     /*
>      * Produce a record without waiting for server. This includes a
> callback
>      * that will print an error if something goes wrong
>      */
>     public static void produceAsync(Producer<String, String> producer,
> String topic, String key, String value) {
>         ProducerRecord<String, String> record = new ProducerRecord<String,
> String>(topic, value);
>         producer.send(record, new DemoProducerCallback());
>     }
>
>     public static class DemoProducerCallback implements Callback {
>         @Override
>         public void onCompletion(RecordMetadata recordMetadata, Exception
> e) {
>             if (e != null) {
>                 System.out.println("Error producing to topic " +
>                             ((recordMetadata != null) ?
> recordMetadata.topic() : ""));
>                 e.printStackTrace();
>             }
>         }
>     }
>



-- 
Radha Krishna, Proddaturi
253-234-5657

Reply via email to