[ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16221:
------------------------------------
    Description: 
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

The bug is surfaced after `commitTransaction()` did timeout or after an 
`InvalidProducerEpochException` from a `send()` call, leading to the call to 
`abortTransaction()` – Kafka Streams does not track right now if a commit-TX is 
in progress.
{code:java}
java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
because the previous call to `commitTransaction` timed out and must be retried
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
    at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) 
{code}
and
{code:java}
[2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
[i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
sending r       ecord to topic joined-counts for task 1_2 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImp       l)
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.

// followed by

[2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
transactionalId=stream-soak-test       -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] 
Aborting producer batches due to fatal error 
(org.apache.kafka.clients.producer.internals.Sender)
java.lang.IllegalStateException: TransactionalId 
stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
attempted from state FATAL_ERROR to state ABORTABLE_ERROR
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:460)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:252)
        at java.lang.Thread.run(Thread.java:750) {code}
If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).

  was:
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

The bug is surfaced after `commitTransaction()` did timeout and another error 
happens leading to the call to `abortTransaction()` – Kafka Streams does not 
track right now if a commit-TX is in progress.

If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).


> IllegalStateException from Producer
> -----------------------------------
>
>                 Key: KAFKA-16221
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16221
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.6.0
>            Reporter: Matthias J. Sax
>            Priority: Critical
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
>     at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r       ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp       l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test       
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
>         at java.util.ArrayList.forEach(ArrayList.java:1259)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
>         at java.lang.Iterable.forEach(Iterable.java:75)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:460)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:252)
>         at java.lang.Thread.run(Thread.java:750) {code}
> If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
> Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
> However, Kafka Streams is actually in a state in which it can recover from, 
> and thus should not let StreamThread die by carry forward (by not calling 
> `abortTransaction()` and moving forward with the dirty close of the task).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to