[
https://issues.apache.org/jira/browse/KAFKA-13794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xuexiaoyue updated KAFKA-13794:
-------------------------------
Description:
Under the case of idempotence is enabled, when a batch reaches its
request.timeout.ms but not yet reaches delivery.timeout.ms, it will be retried
and wait for another request.timeout.ms. During the time of this interval, the
delivery.timeout.ms may be reached and Sender will remove this in flight batch
and bump the producer epoch because of the unresolved sequence, then the
sequence of this partition will be reset to 0.
At this time, if a new batch is sent to the same partition and the former batch
reaches request.timeout.ms again, we will see an exception being thrown out by
NetworkClient:
{code:java}
[ERROR] [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Uncaught error in request completion:
java.lang.IllegalStateException: We are re-enqueueing a batch which is not
tracked as part of the in flight requests. batch.topicPartition:
txn_test_1648891362900-2; batch.baseSequence: 0
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
The cause of this is the inflightBatchesBySequence in TransactionManager is not
being remove correctly. One batch may be removed by another batch with the same
sequence number.
The potential consequence of this I can think out is that the send progress
will be blocked until the latter batch being expired by delivery.timeout.ms
was:
Under the case of idempotence is enabled. When a batch reaches its
request.timeout.ms but not yet reaches delivery.timeout.ms, it will be retried
and wait for another request.timeout.ms. During the time of this interval, the
delivery.timeout.ms may be reached and Sender will remove this in flight batch
and bump the producer epoch because of the unresolved sequence, then the
sequence of this partition will be reset to 0.
At this time, if a new batch is sent to the same partition and the former batch
reaches request.timeout.ms again. We will see the exception being thrown out by
NetworkClient:
{code:java}
[ERROR] [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Uncaught error in request completion:
java.lang.IllegalStateException: We are re-enqueueing a batch which is not
tracked as part of the in flight requests. batch.topicPartition:
txn_test_1648891362900-2; batch.baseSequence: 0
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
The cause of this is the inflightBatchesBySequence in TransactionManager is not
being remove correctly. One batch may be removed by another batch with the same
sequence number.
The potential consequence of this I can think out is that the send progress
will be blocked until the latter batch being expired by delivery.timeout.ms
> In flight batch being deleted silently by another completed batch
> -----------------------------------------------------------------
>
> Key: KAFKA-13794
> URL: https://issues.apache.org/jira/browse/KAFKA-13794
> Project: Kafka
> Issue Type: Bug
> Reporter: xuexiaoyue
> Priority: Major
>
> Under the case of idempotence is enabled, when a batch reaches its
> request.timeout.ms but not yet reaches delivery.timeout.ms, it will be
> retried and wait for another request.timeout.ms. During the time of this
> interval, the delivery.timeout.ms may be reached and Sender will remove this
> in flight batch and bump the producer epoch because of the unresolved
> sequence, then the sequence of this partition will be reset to 0.
> At this time, if a new batch is sent to the same partition and the former
> batch reaches request.timeout.ms again, we will see an exception being thrown
> out by NetworkClient:
> {code:java}
> [ERROR] [kafka-producer-network-thread | producer-1]
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Uncaught error in request completion:
> java.lang.IllegalStateException: We are re-enqueueing a batch which is not
> tracked as part of the in flight requests. batch.topicPartition:
> txn_test_1648891362900-2; batch.baseSequence: 0
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
> The cause of this is the inflightBatchesBySequence in TransactionManager is
> not being remove correctly. One batch may be removed by another batch with
> the same sequence number.
> The potential consequence of this I can think out is that the send progress
> will be blocked until the latter batch being expired by delivery.timeout.ms
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)