Greg Harris created KAFKA-16087:
-----------------------------------
Summary: Sink connector dropping incorrect record when
ErrantRecordReporter used asynchronously
Key: KAFKA-16087
URL: https://issues.apache.org/jira/browse/KAFKA-16087
Project: Kafka
Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Greg Harris
Assignee: Greg Harris
The ErrantRecordReporter introduced in KIP-610 allows sink connectors to push
records to the connector DLQ topic. The implementation of this reporter
interacts with the ProcessingContext within the per-task
RetryWithToleranceOperator. The ProcessingContext stores mutable state about
the current operation, such as what error has occurred or what record is being
operated on.
The ProcessingContext and RetryWithToleranceOperator is also used by the
converter and transformation pipeline of the connector for similar reasons.
When the ErrantRecordReporter#report function is called from SinkTask#put,
there is no contention over the mutable state, as the thread used for
SinkTask#put is also responsible for converting and transforming the record.
However, if ErrantRecordReporter#report is called by an extra thread within the
SinkTask, there is thread contention on the single mutable ProcessingContext.
This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the
synchronized keyword was added to all methods of RetryWithToleranceOperator
which interact with the ProcessingContext. However, this solution still allows
the RWTO methods to interleave, and produce unintended data races. Consider the
following interleaving:
1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call
ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For
example, [converting
headers|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L539]
6. The operation succeeds, and the ProcessingContext is left with error ==
null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls
[RWTO
executeFailed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java#L109]
and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently
failed() == true.
10. Thread 1 then resumes execution, and calls [RWTO
failed|https://github.com/apache/kafka/blob/c0b649345580e4dfb2ebb88d3aaace71afe70d75/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L541]
which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded
without error.
12. Record B is never delivered to the Sink Task, and never delivered to the
error reporter for processing, despite having produced no error during
processing.
This per-method synchronization for returning nulls and errors separately is
insufficient, and either the data sharing should be avoided or a different
locking mechanism should be used.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)