Bruno Cadonna created KAFKA-16903: ------------------------------------- Summary: Task should consider producer error previously occurred for different task Key: KAFKA-16903 URL: https://issues.apache.org/jira/browse/KAFKA-16903 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Reporter: Bruno Cadonna Assignee: Bruno Cadonna
A task does not consider a producer error that occurred for a different task. The following log messages show the issue. Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records with an {{InvalidTxnStateException}}: {code:java} [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered sending record to topic stream-soak-test-node-name-repartition for task 0_2 due to: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. Exception handler choose to FAIL the processing, no more records would be sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream task 0_2 due to the following error: (org.apache.kafka.streams.processor.internals.TaskExecutor) org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic stream-soak-test-node-name-repartition for task 0_2 due to: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) at org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. {code} Just before the exception of task 0_2 also task 0_0 encountered an exception while producing: {code:java} [2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered sending record to topic stream-soak-test-network-id-repartition for task 0_0 due to: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. [2024-05-30 10:20:35,881] INFO [kafka-producer-network-thread | i-0af25f5c2bd9bba31-StreamThread-1-producer] [Producer clientId=i-0af25f5c2bd9bba31-StreamThread-1-producer, transactionalId=stream-soak-test-141294b0-59b9-496e-8857-65a1fe8bac5a-1] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch. (org.apache.kafka.clients.producer.internals.TransactionManager) {code} Apparently, task {{0_2}} does not know anything about the exception thrown by task {{0_0}}, otherwise task {{0_2}} would not try to produce records and run into the {{InvalidTxnStateException}}. The root cause is that when a send exception happens, the exception is stored in field variable {{sendException}} in each instance of {{RecordCollectorImpl}}. There is one instance of {{RecordCollectorImpl}} per task. That means, that when one task sets its {{sendException}} field the other task does not know about it. -- This message was sent by Atlassian Jira (v8.20.10#820010)