[ 
https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16087:
--------------------------------
    Affects Version/s: 3.2.0
                       2.6.0

> Tasks dropping incorrect records when errors.tolerance=all and errors 
> reported asynchronously due to data race
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16087
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16087
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.6.0, 3.2.0, 3.7.0
>            Reporter: Greg Harris
>            Assignee: Greg Harris
>            Priority: Major
>
> The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors 
> to push records to the connector DLQ topic. The implementation of this 
> reporter interacts with the ProcessingContext within the per-task 
> RetryWithToleranceOperator. The ProcessingContext stores mutable state about 
> the current operation, such as what error has occurred or what record is 
> being operated on.
> The ProcessingContext and RetryWithToleranceOperator is also used by the 
> converter and transformation pipeline of the connector for similar reasons. 
> When the ErrantRecordReporter#report function is called from SinkTask#put, 
> there is no contention over the mutable state, as the thread used for 
> SinkTask#put is also responsible for converting and transforming the record. 
> However, if ErrantRecordReporter#report is called by an extra thread within 
> the SinkTask, there is thread contention on the single mutable 
> ProcessingContext.
> This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the 
> synchronized keyword was added to all methods of RetryWithToleranceOperator 
> which interact with the ProcessingContext. However, this solution still 
> allows the RWTO methods to interleave, and produce unintended data races. 
> Consider the following interleaving:
> 1. Thread 1 converts and transforms record A successfully.
> 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
> 3. Thread 1 queues some other thread 2 with some delay to call 
> ErrantRecordReporter#report(A).
> 4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
> 5. Thread 1 calls RWTO#execute for a converter or transformation operation. 
> For example, [converting 
> headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
> 6. The operation succeeds, and the ProcessingContext is left with error == 
> null, or equivalently failed() == false.
> 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
> 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls 
> [RWTO 
> executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
>  and returns.
> 9. The operation leaves ProcessingContext with error != null, or equivalently 
> failed() == true.
> 10. Thread 1 then resumes execution, and calls [RWTO 
> failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
>  which evaluates to true.
> 11. Thread 1 then drops Record B, even though the header conversion succeeded 
> without error.
> 12. Record B is never delivered to the Sink Task, and never delivered to the 
> error reporter for processing, despite having produced no error during 
> processing.
> This per-method synchronization for returning nulls and errors separately is 
> insufficient, and either the data sharing should be avoided or a different 
> locking mechanism should be used.
> A similar flaw exists in source connectors and asynchronous errors reported 
> by the producer, and was introduced in KIP-779 (3.2.0)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to