Paul Davidson created KAFKA-6876:
------------------------------------
Summary: Sender exceptions ignored by WorkerSourceTask producer
Callback causing data loss
Key: KAFKA-6876
URL: https://issues.apache.org/jira/browse/KAFKA-6876
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 1.1.0, 0.11.0.1
Environment: Linux, JDK 8
Reporter: Paul Davidson
The producer callback in "WorkerSourceTask" handles exceptions during a send()
by logging at ERROR level and continuing. This can lead to offsets being
committed for records that were never sent correctly. The records are
effectively skipped, leading to data loss in our use case.
The source code for the Callback "onCompletion()" method suggests this should
"basically never happen ... callbacks with exceptions should never be invoked
in practice", but we have seen this happen several times in production,
especially in near heap-exhaustion situations when the Sender thread generates
an exception (often caused by KAFKA-6551).
>From WorkerSourceTask:
{code:java}
new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
// Given the default settings for zero data loss, this should
basically never happen --
// between "infinite" retries, indefinite blocking on full buffers,
and "infinite" request
// timeouts, callbacks with exceptions should never be invoked in
practice. If the
// user overrode these settings, the best we can do is notify them
of the failure via
// logging.
log.error("{} failed to send record to {}: {}", this, topic, e);
log.debug("{} Failed record: {}", this, preTransformRecord);
} else {
log.trace("{} Wrote record successfully: topic {} partition {} offset
{}",
this,
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(preTransformRecord);
}
recordSent(producerRecord);
counter.completeRecord();
}
}
{code}
Example of an exception triggering the bug:
{code:java}
2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR
o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to
topic-name: {}
java.lang.IllegalStateException: Producer is closed forcefully.
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
at java.lang.Thread.run(Thread.java:748)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)