[ https://issues.apache.org/jira/browse/KAFKA-6876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466434#comment-16466434 ]
Paul Davidson commented on KAFKA-6876: -------------------------------------- Note I do have a patch ready that simply fails the affected Task to avoid spurious offsets being committed. I'm happy to share this, but I would like to hear other opinions on how this case should be handled. > Sender exceptions ignored by WorkerSourceTask producer Callback causing data > loss > --------------------------------------------------------------------------------- > > Key: KAFKA-6876 > URL: https://issues.apache.org/jira/browse/KAFKA-6876 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.11.0.1, 1.1.0 > Environment: Linux, JDK 8 > Reporter: Paul Davidson > Priority: Major > > The producer callback in "WorkerSourceTask" handles exceptions during a > send() by logging at ERROR level and continuing. This can lead to offsets > being committed for records that were never sent correctly. The records are > effectively skipped, leading to data loss in our use case. > The source code for the Callback "onCompletion()" method suggests this should > "basically never happen ... callbacks with exceptions should never be invoked > in practice", but we have seen this happen several times in production, > especially in near heap-exhaustion situations when the Sender thread > generates an exception (often caused by KAFKA-6551). > From WorkerSourceTask: > {code:java} > new Callback() { > @Override > public void onCompletion(RecordMetadata recordMetadata, Exception e) { > if (e != null) { > // Given the default settings for zero data loss, this should > basically never happen -- > // between "infinite" retries, indefinite blocking on full > buffers, and "infinite" request > // timeouts, callbacks with exceptions should never be invoked in > practice. If the > // user overrode these settings, the best we can do is notify them > of the failure via > // logging. > log.error("{} failed to send record to {}: {}", this, topic, e); > log.debug("{} Failed record: {}", this, preTransformRecord); > } else { > log.trace("{} Wrote record successfully: topic {} partition {} > offset {}", > this, > recordMetadata.topic(), recordMetadata.partition(), > recordMetadata.offset()); > commitTaskRecord(preTransformRecord); > } > recordSent(producerRecord); > counter.completeRecord(); > } > } > {code} > > Example of an exception triggering the bug: > {code:java} > 2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR > o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to > topic-name: {} > java.lang.IllegalStateException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)