[
https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-16903.
-------------------------------------
Resolution: Fixed
> Task should consider producer error previously occurred for different task
> --------------------------------------------------------------------------
>
> Key: KAFKA-16903
> URL: https://issues.apache.org/jira/browse/KAFKA-16903
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Bruno Cadonna
> Assignee: Bruno Cadonna
> Priority: Major
> Fix For: 3.8.0
>
>
> A task does not consider a producer error that occurred for a different task.
> The following log messages show the issue.
> Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records
> with an {{InvalidTxnStateException}}:
> {code:java}
> [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread |
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered
> sending record to topic stream-soak-test-node-name-repartition for task 0_2
> due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be
> sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1]
> stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream
> task 0_2 due to the following error:
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending
> record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be
> sent.
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
> at
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
> at
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
> at
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
> at
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The
> producer attempted a transactional operation in an invalid state.
> {code}
> Just before the exception of task 0_2 also task 0_0 encountered an
> exception while producing:
> {code:java}
> [2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread |
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered
> sending record to topic stream-soak-test-network-id-repartition for task 0_0
> due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since
> the producer is fenced, indicating the task may be migrated out
> (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> [2024-05-30 10:20:35,881] INFO [kafka-producer-network-thread |
> i-0af25f5c2bd9bba31-StreamThread-1-producer] [Producer
> clientId=i-0af25f5c2bd9bba31-StreamThread-1-producer,
> transactionalId=stream-soak-test-141294b0-59b9-496e-8857-65a1fe8bac5a-1]
> Transiting to abortable error state due to
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> {code}
> Apparently, task {{0_2}} does not know anything about the exception thrown by
> task {{0_0}}, otherwise task {{0_2}} would not try to produce records and run
> into the {{InvalidTxnStateException}}.
> The root cause is that when a send exception happens, the exception is stored
> in field variable {{sendException}} in each instance of
> {{RecordCollectorImpl}}. There is one instance of {{RecordCollectorImpl}} per
> task. That means, that when one task sets its {{sendException}} field the
> other task does not know about it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)