[ 
https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-6876.
----------------------------------
    Resolution: Duplicate

> Sender exceptions ignored by WorkerSourceTask producer Callback causing data 
> loss
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-6876
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6876
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.1, 1.1.0, 2.0.1
>         Environment: Linux, JDK 8
>            Reporter: Paul Davidson
>            Priority: Major
>
> The producer callback in "WorkerSourceTask" handles exceptions during a 
> send() by logging at ERROR level and continuing.  This can lead to offsets 
> being committed for records that were never sent correctly.  The records are 
> effectively skipped, leading to data loss in our use case.   
> The source code for the Callback "onCompletion()" method suggests this should 
> "basically never happen ... callbacks with exceptions should never be invoked 
> in practice", but we have seen this happen several times in production, 
> especially in near heap-exhaustion situations when the Sender thread 
> generates an exception (often caused by KAFKA-6551).
> From WorkerSourceTask line 253:
> {code:java}
> new Callback() {
>    @Override
>    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
>        if (e != null) {
>            // Given the default settings for zero data loss, this should 
> basically never happen --
>            // between "infinite" retries, indefinite blocking on full 
> buffers, and "infinite" request
>            // timeouts, callbacks with exceptions should never be invoked in 
> practice. If the
>            // user overrode these settings, the best we can do is notify them 
> of the failure via
>            // logging.
>            log.error("{} failed to send record to {}: {}", this, topic, e);
>            log.debug("{} Failed record: {}", this, preTransformRecord);
>        } else {
>           log.trace("{} Wrote record successfully: topic {} partition {} 
> offset {}",
>                    this,
>                    recordMetadata.topic(), recordMetadata.partition(),
>                    recordMetadata.offset());
>            commitTaskRecord(preTransformRecord);
>        }
>        recordSent(producerRecord);
>        counter.completeRecord();
>    }
> }
> {code}
>  
> Example of an exception triggering the bug:
> {code:java}
> 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
> o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
> topic-name: {}
> java.lang.IllegalStateException: Producer is closed forcefully.
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
>         at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to