xuexiaoyue created KAFKA-13794:
----------------------------------
Summary: In flight batch being deleted silently by another
completed batch
Key: KAFKA-13794
URL: https://issues.apache.org/jira/browse/KAFKA-13794
Project: Kafka
Issue Type: Bug
Reporter: xuexiaoyue
Under the case of idempotence is enabled. When a batch reaches its
request.timeout.ms but not yet reaches delivery.timeout.ms, it will be retried
and wait for another request.timeout.ms. During the time of this interval, the
delivery.timeout.ms may be reached and Sender will remove this in flight batch
and bump the producer epoch because of the unresolved sequence, then the
sequence of this partition will be reset to 0.
At this time, if a new batch is sent to the same partition and the former batch
reaches request.timeout.ms again. We will see the exception being thrown out by
NetworkClient:
{code:java}
[ERROR] [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Uncaught error in request completion:
java.lang.IllegalStateException: We are re-enqueueing a batch which is not
tracked as part of the in flight requests. batch.topicPartition:
txn_test_1648891362900-2; batch.baseSequence: 0
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
The cause of this is the inflightBatchesBySequence in TransactionManager is not
being remove correctly. One batch may be removed by another batch with the same
sequence number.
The potential consequence of this I can think out is that the send progress
will be blocked until the latter batch being expired by delivery.timeout.ms
--
This message was sent by Atlassian Jira
(v8.20.1#820001)