Paul Davidson created KAFKA-6876:
------------------------------------

             Summary: Sender exceptions ignored by WorkerSourceTask producer 
Callback causing data loss
                 Key: KAFKA-6876
                 URL: https://issues.apache.org/jira/browse/KAFKA-6876
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 1.1.0, 0.11.0.1
         Environment: Linux, JDK 8
            Reporter: Paul Davidson


The producer callback in "WorkerSourceTask" handles exceptions during a send() 
by logging at ERROR level and continuing.  This can lead to offsets being 
committed for records that were never sent correctly.  The records are 
effectively skipped, leading to data loss in our use case.   

The source code for the Callback "onCompletion()" method suggests this should 
"basically never happen ... callbacks with exceptions should never be invoked 
in practice", but we have seen this happen several times in production, 
especially in near heap-exhaustion situations when the Sender thread generates 
an exception (often caused by KAFKA-6551).

>From WorkerSourceTask:
{code:java}
new Callback() {
   @Override
   public void onCompletion(RecordMetadata recordMetadata, Exception e) {
       if (e != null) {
           // Given the default settings for zero data loss, this should 
basically never happen --
           // between "infinite" retries, indefinite blocking on full buffers, 
and "infinite" request
           // timeouts, callbacks with exceptions should never be invoked in 
practice. If the
           // user overrode these settings, the best we can do is notify them 
of the failure via
           // logging.
           log.error("{} failed to send record to {}: {}", this, topic, e);
           log.debug("{} Failed record: {}", this, preTransformRecord);
       } else {
          log.trace("{} Wrote record successfully: topic {} partition {} offset 
{}",
                   this,
                   recordMetadata.topic(), recordMetadata.partition(),
                   recordMetadata.offset());
           commitTaskRecord(preTransformRecord);
       }
       recordSent(producerRecord);
       counter.completeRecord();
   }
}
{code}
 

Example of an exception triggering the bug:
{code:java}
2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR 
o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to 
topic-name: {}
java.lang.IllegalStateException: Producer is closed forcefully.
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
        at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to