[ https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853602#comment-17853602 ]
Bruno Cadonna commented on KAFKA-16903: --------------------------------------- [~ableegoldman] I actually do not know. I found it on trunk. If you are sure that you found it on a earlier version, please set the affected version accordingly. I actually suspect that it was there before, because we haven't changed that code for a while. > Task should consider producer error previously occurred for different task > -------------------------------------------------------------------------- > > Key: KAFKA-16903 > URL: https://issues.apache.org/jira/browse/KAFKA-16903 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.7.0 > Reporter: Bruno Cadonna > Assignee: Bruno Cadonna > Priority: Major > Fix For: 3.8.0 > > > A task does not consider a producer error that occurred for a different task. > The following log messages show the issue. > Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records > with an {{InvalidTxnStateException}}: > {code:java} > [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread > [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered > sending record to topic stream-soak-test-node-name-repartition for task 0_2 > due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] > stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream > task 0_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic stream-soak-test-node-name-repartition for task 0_2 due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) > at > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The > producer attempted a transactional operation in an invalid state. > {code} > Just before the exception of task 0_2 also task 0_0 encountered an > exception while producing: > {code:java} > [2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread > [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered > sending record to topic stream-soak-test-network-id-repartition for task 0_0 > due to: > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > Written offsets would not be recorded and no more records would be sent since > the producer is fenced, indicating the task may be migrated out > (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > [2024-05-30 10:20:35,881] INFO [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] [Producer > clientId=i-0af25f5c2bd9bba31-StreamThread-1-producer, > transactionalId=stream-soak-test-141294b0-59b9-496e-8857-65a1fe8bac5a-1] > Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > (org.apache.kafka.clients.producer.internals.TransactionManager) > {code} > Apparently, task {{0_2}} does not know anything about the exception thrown by > task {{0_0}}, otherwise task {{0_2}} would not try to produce records and run > into the {{InvalidTxnStateException}}. > The root cause is that when a send exception happens, the exception is stored > in field variable {{sendException}} in each instance of > {{RecordCollectorImpl}}. There is one instance of {{RecordCollectorImpl}} per > task. That means, that when one task sets its {{sendException}} field the > other task does not know about it. -- This message was sent by Atlassian Jira (v8.20.10#820010)