[ https://issues.apache.org/jira/browse/KAFKA-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Harris updated KAFKA-16087: -------------------------------- Summary: Tasks dropping incorrect records when errors.tolerance=all and errors reported asynchronously due to data race (was: Sink connector dropping incorrect record when ErrantRecordReporter used asynchronously) > Tasks dropping incorrect records when errors.tolerance=all and errors > reported asynchronously due to data race > -------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-16087 > URL: https://issues.apache.org/jira/browse/KAFKA-16087 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.7.0 > Reporter: Greg Harris > Assignee: Greg Harris > Priority: Major > > The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push > records to the connector DLQ topic. The implementation of this reporter > interacts with the ProcessingContext within the per-task > RetryWithToleranceOperator. The ProcessingContext stores mutable state about > the current operation, such as what error has occurred or what record is > being operated on. > The ProcessingContext and RetryWithToleranceOperator is also used by the > converter and transformation pipeline of the connector for similar reasons. > When the ErrantRecordReporter#report function is called from SinkTask#put, > there is no contention over the mutable state, as the thread used for > SinkTask#put is also responsible for converting and transforming the record. > However, if ErrantRecordReporter#report is called by an extra thread within > the SinkTask, there is thread contention on the single mutable > ProcessingContext. > This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the > synchronized keyword was added to all methods of RetryWithToleranceOperator > which interact with the ProcessingContext. However, this solution still > allows the RWTO methods to interleave, and produce unintended data races. > Consider the following interleaving: > 1. Thread 1 converts and transforms record A successfully. > 2. Thread 1 calls SinkTask#put(A) and delivers the message to the task. > 3. Thread 1 queues some other thread 2 with some delay to call > ErrantRecordReporter#report(A). > 4. Thread 1 returns from SinkTask#put and polls record B from the consumer. > 5. Thread 1 calls RWTO#execute for a converter or transformation operation. > For example, [converting > headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539] > 6. The operation succeeds, and the ProcessingContext is left with error == > null, or equivalently failed() == false. > 7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report. > 8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls > [RWTO > executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109] > and returns. > 9. The operation leaves ProcessingContext with error != null, or equivalently > failed() == true. > 10. Thread 1 then resumes execution, and calls [RWTO > failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541] > which evaluates to true. > 11. Thread 1 then drops Record B, even though the header conversion succeeded > without error. > 12. Record B is never delivered to the Sink Task, and never delivered to the > error reporter for processing, despite having produced no error during > processing. > This per-method synchronization for returning nulls and errors separately is > insufficient, and either the data sharing should be avoided or a different > locking mechanism should be used. > A similar flaw exists in source connectors and asynchronous errors reported > by the producer. -- This message was sent by Atlassian Jira (v8.20.10#820010)