[
https://issues.apache.org/jira/browse/KAFKA-5731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-5731.
------------------------------------
Resolution: Fixed
Fix Version/s: (was: 1.0.0)
Issue resolved by pull request 3672
[https://github.com/apache/kafka/pull/3672]
> Connect WorkerSinkTask out of order offset commit can lead to inconsistent
> state
> --------------------------------------------------------------------------------
>
> Key: KAFKA-5731
> URL: https://issues.apache.org/jira/browse/KAFKA-5731
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Reporter: Jason Gustafson
> Assignee: Randall Hauch
> Fix For: 0.11.0.1
>
>
> In Connect's WorkerSinkTask, we do sequence number validation to ensure that
> offset commits are handled in the right order
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).
>
> Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field
> is overridden regardless of this sequence check as long as the response had
> no error
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284):
> {code:java}
> OffsetCommitCallback cb = new OffsetCommitCallback() {
> @Override
> public void onComplete(Map<TopicPartition, OffsetAndMetadata>
> offsets, Exception error) {
> if (error == null) {
> lastCommittedOffsets = offsets;
> }
> onCommitCompleted(error, seqno);
> }
> };
> {code}
> Hence if we get an out of order commit, then the internal state will be
> inconsistent. To fix this, we should only override {{lastCommittedOffsets}}
> after sequence validation as part of the {{onCommitCompleted(...)}} method.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)