[ https://issues.apache.org/jira/browse/KAFKA-5731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch updated KAFKA-5731: --------------------------------- Description: In Connect's WorkerSinkTask, we do sequence number validation to ensure that offset commits are handled in the right order (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199). Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field is overridden regardless of this sequence check as long as the response had no error (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284): {code:java} OffsetCommitCallback cb = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) { if (error == null) { lastCommittedOffsets = offsets; } onCommitCompleted(error, seqno); } }; {code} Hence if we get an out of order commit, then the internal state will be inconsistent. To fix this, we should only override {{lastCommittedOffsets}} after sequence validation as part of the {{onCommitCompleted(...)}} method. was: In Connect's WorkerSinkTask, we do sequence number validation to ensure that offset commits are handled in the right order (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199). Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field is overridden regardless of this sequence check as long as the response had no error (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284). Hence if we get an out of order commit, then the internal state will be inconsistent. To fix this, we should only override {{lastCommittedOffsets}} after sequence validation. > Connect WorkerSinkTask out of order offset commit can lead to inconsistent > state > -------------------------------------------------------------------------------- > > Key: KAFKA-5731 > URL: https://issues.apache.org/jira/browse/KAFKA-5731 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Jason Gustafson > Assignee: Randall Hauch > Fix For: 1.0.0 > > > In Connect's WorkerSinkTask, we do sequence number validation to ensure that > offset commits are handled in the right order > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199). > > Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field > is overridden regardless of this sequence check as long as the response had > no error > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284): > {code:java} > OffsetCommitCallback cb = new OffsetCommitCallback() { > @Override > public void onComplete(Map<TopicPartition, OffsetAndMetadata> > offsets, Exception error) { > if (error == null) { > lastCommittedOffsets = offsets; > } > onCommitCompleted(error, seqno); > } > }; > {code} > Hence if we get an out of order commit, then the internal state will be > inconsistent. To fix this, we should only override {{lastCommittedOffsets}} > after sequence validation as part of the {{onCommitCompleted(...)}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)