Bruno Cadonna created KAFKA-16903:
-------------------------------------

             Summary: Task should consider producer error previously occurred 
for different task
                 Key: KAFKA-16903
                 URL: https://issues.apache.org/jira/browse/KAFKA-16903
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.7.0
            Reporter: Bruno Cadonna
            Assignee: Bruno Cadonna


A task does not consider a producer error that occurred for a different task.

The following log messages show the issue.

Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
with an {{InvalidTxnStateException}}:

{code:java}
[2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
sending record to topic stream-soak-test-node-name-repartition for task 0_2 due 
to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent. 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.

[2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
task 0_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
        at 
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.
{code} 

Just before the exception of task 0_2  also task 0_0  encountered an exception 
while producing:

{code:java}
[2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 0_0 
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.

[2024-05-30 10:20:35,881] INFO [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] [Producer 
clientId=i-0af25f5c2bd9bba31-StreamThread-1-producer, 
transactionalId=stream-soak-test-141294b0-59b9-496e-8857-65a1fe8bac5a-1] 
Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch. 
(org.apache.kafka.clients.producer.internals.TransactionManager)
{code} 

Apparently, task {{0_2}} does not know anything about the exception thrown by 
task {{0_0}}, otherwise task {{0_2}} would not try to produce records and run 
into the {{InvalidTxnStateException}}.

The root cause is that when a send exception happens, the exception is stored 
in field variable {{sendException}} in each instance of 
{{RecordCollectorImpl}}. There is one instance of {{RecordCollectorImpl}} per 
task. That means, that when one task sets its {{sendException}} field the other 
task does not know about it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to