Paul Davidson created KAFKA-6876: ------------------------------------ Summary: Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss Key: KAFKA-6876 URL: https://issues.apache.org/jira/browse/KAFKA-6876 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.1.0, 0.11.0.1 Environment: Linux, JDK 8 Reporter: Paul Davidson
The producer callback in "WorkerSourceTask" handles exceptions during a send() by logging at ERROR level and continuing. This can lead to offsets being committed for records that were never sent correctly. The records are effectively skipped, leading to data loss in our use case. The source code for the Callback "onCompletion()" method suggests this should "basically never happen ... callbacks with exceptions should never be invoked in practice", but we have seen this happen several times in production, especially in near heap-exhaustion situations when the Sender thread generates an exception (often caused by KAFKA-6551). >From WorkerSourceTask: {code:java} new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { // Given the default settings for zero data loss, this should basically never happen -- // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request // timeouts, callbacks with exceptions should never be invoked in practice. If the // user overrode these settings, the best we can do is notify them of the failure via // logging. log.error("{} failed to send record to {}: {}", this, topic, e); log.debug("{} Failed record: {}", this, preTransformRecord); } else { log.trace("{} Wrote record successfully: topic {} partition {} offset {}", this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord); } recordSent(producerRecord); counter.completeRecord(); } } {code} Example of an exception triggering the bug: {code:java} 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to topic-name: {} java.lang.IllegalStateException: Producer is closed forcefully. at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)