[ 
https://issues.apache.org/jira/browse/KAFKA-5731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-5731:
---------------------------------
    Description: 
In Connect's WorkerSinkTask, we do sequence number validation to ensure that 
offset commits are handled in the right order 
(https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).
 

Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field is 
overridden regardless of this sequence check as long as the response had no 
error 
(https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284):

{code:java}
            OffsetCommitCallback cb = new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception error) {
                    if (error == null) {
                        lastCommittedOffsets = offsets;
                    }
                    onCommitCompleted(error, seqno);
                }
            };
{code}

Hence if we get an out of order commit, then the internal state will be 
inconsistent. To fix this, we should only override {{lastCommittedOffsets}} 
after sequence validation as part of the {{onCommitCompleted(...)}} method.

  was:
In Connect's WorkerSinkTask, we do sequence number validation to ensure that 
offset commits are handled in the right order 
(https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).
 

Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field is 
overridden regardless of this sequence check as long as the response had no 
error 
(https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284).
 

Hence if we get an out of order commit, then the internal state will be 
inconsistent. To fix this, we should only override {{lastCommittedOffsets}} 
after sequence validation.


> Connect WorkerSinkTask out of order offset commit can lead to inconsistent 
> state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-5731
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5731
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Jason Gustafson
>            Assignee: Randall Hauch
>             Fix For: 1.0.0
>
>
> In Connect's WorkerSinkTask, we do sequence number validation to ensure that 
> offset commits are handled in the right order 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).
>  
> Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field 
> is overridden regardless of this sequence check as long as the response had 
> no error 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284):
> {code:java}
>             OffsetCommitCallback cb = new OffsetCommitCallback() {
>                 @Override
>                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
> offsets, Exception error) {
>                     if (error == null) {
>                         lastCommittedOffsets = offsets;
>                     }
>                     onCommitCompleted(error, seqno);
>                 }
>             };
> {code}
> Hence if we get an out of order commit, then the internal state will be 
> inconsistent. To fix this, we should only override {{lastCommittedOffsets}} 
> after sequence validation as part of the {{onCommitCompleted(...)}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to