Denis Rystsov created KAFKA-14034:
-------------------------------------

             Summary: Consistency violation: enabled idempotency doesn't 
prevent dublicates
                 Key: KAFKA-14034
                 URL: https://issues.apache.org/jira/browse/KAFKA-14034
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.1.1, 3.0.0
            Reporter: Denis Rystsov


Hey folks, I've observed duplicated records in the log while idempotency was 
enabled and it looks like the kafka client is the culprit. I've tested on 3.0.0 
but the tip of kafka repo is also affected

Let a user sends two produce requests without async so there is two inflight 
requests
{code:java}
producer.send(A)
producer.send(B){code}
Let the first request results with a retry-able error after it was written to 
disk and let the second request results with UNKNOWN_SERVER_ERROR. Any 
unhandled exception on the broker side results in UNKNOWN_SERVER_ERROR so it 
may happen.

Since request A is retry-able it is put into the outbound queue there - 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L623]

Let B's UNKNOWN_SERVER_ERROR is received before A is retried. It is being 
processed in the following methods:
 * 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642]
 * 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L742]
 * 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L761]
 * 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L624]
 * maybeTransitionToErrorState doesn't consider UNKNOWN_SERVER_ERROR fatal so 
it doesn't mark the request as such: 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L611]
 * as result handleFailedBatch requests epochBump 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L652]

When epoch is bumped it rewrites sequence numbers of the inflight requests: 
[https://github.com/apache/kafka/blob/1daa149730e3c56b7b6fe8f14369178273e8efc4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L481]

In our case it rewrites A's sequence numbers and when the request is retried 
the broker can't dedupe it and writes it to the log thus violating the 
idempotency guarantees.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to