[ 
https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853456#comment-17853456
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16903:
------------------------------------------------

[~cadonna] is this really a regression that only affects 3.7 or has it been 
around for longer and you just happened to catch it in 3.7?

 

Just wondering because we were running some testing on a slightly older version 
(3.4 or 3.6, don't remember which) and I think we might have seen this.

> Task should consider producer error previously occurred for different task
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-16903
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16903
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Major
>             Fix For: 3.8.0
>
>
> A task does not consider a producer error that occurred for a different task.
> The following log messages show the issue.
> Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
> with an {{InvalidTxnStateException}}:
> {code:java}
> [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
> sending record to topic stream-soak-test-node-name-repartition for task 0_2 
> due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
> stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
> task 0_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
>       at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
>       at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
>       at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
>       at java.util.ArrayList.forEach(ArrayList.java:1259)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
>       at java.lang.Iterable.forEach(Iterable.java:75)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
>       at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>       at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
>       at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
> producer attempted a transactional operation in an invalid state.
> {code} 
> Just before the exception of task 0_2  also task 0_0  encountered an 
> exception while producing:
> {code:java}
> [2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered 
> sending record to topic stream-soak-test-network-id-repartition for task 0_0 
> due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> [2024-05-30 10:20:35,881] INFO [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] [Producer 
> clientId=i-0af25f5c2bd9bba31-StreamThread-1-producer, 
> transactionalId=stream-soak-test-141294b0-59b9-496e-8857-65a1fe8bac5a-1] 
> Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch. 
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> {code} 
> Apparently, task {{0_2}} does not know anything about the exception thrown by 
> task {{0_0}}, otherwise task {{0_2}} would not try to produce records and run 
> into the {{InvalidTxnStateException}}.
> The root cause is that when a send exception happens, the exception is stored 
> in field variable {{sendException}} in each instance of 
> {{RecordCollectorImpl}}. There is one instance of {{RecordCollectorImpl}} per 
> task. That means, that when one task sets its {{sendException}} field the 
> other task does not know about it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to