[ 
https://issues.apache.org/jira/browse/KAFKA-13794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuexiaoyue updated KAFKA-13794:
-------------------------------
    Description: 
Under the case of idempotence is enabled, when a batch reaches its 
request.timeout.ms but not yet reaches delivery.timeout.ms, it will be retried 
and wait for another request.timeout.ms. During the time of this interval, the 
delivery.timeout.ms may be reached and Sender will remove this in flight batch 
and bump the producer epoch because of the unresolved sequence, then the 
sequence of this partition will be reset to 0.

At this time, if a new batch is sent to the same partition and the former batch 
reaches request.timeout.ms again, we will see an exception being thrown out by 
NetworkClient:
{code:java}
[ERROR] [kafka-producer-network-thread | producer-1] 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Uncaught error in request completion:
 java.lang.IllegalStateException: We are re-enqueueing a batch which is not 
tracked as part of the in flight requests. batch.topicPartition: 
txn_test_1648891362900-2; batch.baseSequence: 0
   at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
The cause of this is the inflightBatchesBySequence in TransactionManager is not 
being remove correctly. One batch may be removed by another batch with the same 
sequence number.

The potential consequence of this I can think out is that the send progress 
will be blocked until the latter batch being expired by delivery.timeout.ms

 

  was:
Under the case of idempotence is enabled. When a batch reaches its 
request.timeout.ms but not yet reaches delivery.timeout.ms, it will be retried 
and wait for another request.timeout.ms. During the time of this interval, the 
delivery.timeout.ms may be reached and Sender will remove this in flight batch 
and bump the producer epoch because of the unresolved sequence, then the 
sequence of this partition will be reset to 0.

At this time, if a new batch is sent to the same partition and the former batch 
reaches request.timeout.ms again. We will see the exception being thrown out by 
NetworkClient:
{code:java}
[ERROR] [kafka-producer-network-thread | producer-1] 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Uncaught error in request completion:
 java.lang.IllegalStateException: We are re-enqueueing a batch which is not 
tracked as part of the in flight requests. batch.topicPartition: 
txn_test_1648891362900-2; batch.baseSequence: 0
   at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
 ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 
~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
   at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
The cause of this is the inflightBatchesBySequence in TransactionManager is not 
being remove correctly. One batch may be removed by another batch with the same 
sequence number.

The potential consequence of this I can think out is that the send progress 
will be blocked until the latter batch being expired by delivery.timeout.ms

 


> In flight batch being deleted silently by another completed batch
> -----------------------------------------------------------------
>
>                 Key: KAFKA-13794
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13794
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: xuexiaoyue
>            Priority: Major
>
> Under the case of idempotence is enabled, when a batch reaches its 
> request.timeout.ms but not yet reaches delivery.timeout.ms, it will be 
> retried and wait for another request.timeout.ms. During the time of this 
> interval, the delivery.timeout.ms may be reached and Sender will remove this 
> in flight batch and bump the producer epoch because of the unresolved 
> sequence, then the sequence of this partition will be reset to 0.
> At this time, if a new batch is sent to the same partition and the former 
> batch reaches request.timeout.ms again, we will see an exception being thrown 
> out by NetworkClient:
> {code:java}
> [ERROR] [kafka-producer-network-thread | producer-1] 
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
> Uncaught error in request completion:
>  java.lang.IllegalStateException: We are re-enqueueing a batch which is not 
> tracked as part of the in flight requests. batch.topicPartition: 
> txn_test_1648891362900-2; batch.baseSequence: 0
>    at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.insertInSequenceOrder(RecordAccumulator.java:388)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.reenqueue(RecordAccumulator.java:334)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.reenqueueBatch(Sender.java:668)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:622)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:548)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>  ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) 
> ~[kafka-transaction-test-1.0-SNAPSHOT.jar:?]
>    at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_102] {code}
> The cause of this is the inflightBatchesBySequence in TransactionManager is 
> not being remove correctly. One batch may be removed by another batch with 
> the same sequence number.
> The potential consequence of this I can think out is that the send progress 
> will be blocked until the latter batch being expired by delivery.timeout.ms
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to