[ 
https://issues.apache.org/jira/browse/KAFKA-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118842#comment-17118842
 ] 

Chris Egerton commented on KAFKA-9982:
--------------------------------------

Hi [~q.xu], although offset commit is asynchronous, the framework doesn't 
actually commit offsets for records that haven't been ack'd by the broker yet.

During offset commit, the worker will first [create a 
snapshot|https://github.com/apache/kafka/blob/1c4eb1a5757df611735cfac9b709e0d80d0da4b3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L486]
 of the offsets for to-be-committed records, and then [wait for all of those 
records to be 
ack'd|https://github.com/apache/kafka/blob/1c4eb1a5757df611735cfac9b709e0d80d0da4b3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L490-L492]
 by the broker before committing the offsets in that snapshot.

There is a small issue here where, if the producer fails to send a record with 
a non-retriable exception, none of the offsets in that snapshot will be 
committed before the task is failed, instead of just the offsets for that 
failed record or possibly any record from that point on. But the only downside 
of this is duplicate records being sent on task restart, which doesn't 
compromise the at-least-once delivery guarantees already provided by the 
framework.

Does that clear things up? I'm happy to continue the discussion if you have 
other questions or remarks :)

> [kafka-connect] Source connector does not guarantee at least once delivery
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-9982
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9982
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.5.0
>            Reporter: Qinghui Xu
>            Priority: Major
>
> In kafka-connect runtime, the WorkerSourceTask is responsible for sending 
> records to the destination topics and managing the source offset commit. 
> Committed offsets are then used later for recovery of tasks during rebalance 
> or restart.
> But there are two concerns when looking into the WorkerSourceTask 
> implementation:
>  * When producer fail to send records, there's no retry but just skipping 
> offset commit and then execute next loop (poll for new records)
>  * The offset commit and effectively sending records over network are in fact 
> asynchronous, which means the offset commit could happen before records are 
> received by brokers, and a rebalance/restart in this gap could lead to 
> message loss.
> The conclusion is thus that the source connector does not support at least 
> once semantics by default (without the plugin implementation making extra 
> effort itself). I consider this as a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to