[ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853611#comment-17853611
 ] 

Bruno Cadonna commented on KAFKA-14567:
---------------------------------------

[~mjsax] I am not completely sure, but it could be. The sequence of errors is 
different from this case, but the cause might be the same.
Log messages that kind of indicate that it might be the same issue are the 
following:

{code}
[2024-06-08 10:31:23,354] INFO [kafka-producer-network-thread | 
i-012e82bd04cc837df-StreamThread-2-producer] [Producer 
clientId=i-012e82bd04cc837df-Stream│
Thread-2-producer, 
transactionalId=stream-soak-test-020e3a71-d0f9-49ff-adb5-9d8e6e4354b1-2] 
Transiting to fatal error state due to org.apache.kafka.common.│
errors.ProducerFencedException: There is a newer producer with the same 
transactionalId which fences the current one. 
(org.apache.kafka.clients.producer.in│
ternals.TransactionManager)                                                     
                                                                           
[2024-06-08 10:31:23,354] INFO [kafka-producer-network-thread | 
i-012e82bd04cc837df-StreamThread-2-producer] [Producer 
clientId=i-012e82bd04cc837df-Stream│
Thread-2-producer, 
transactionalId=stream-soak-test-020e3a71-d0f9-49ff-adb5-9d8e6e4354b1-2] 
Transiting to abortable error state due to org.apache.kafka.com│
mon.errors.InvalidProducerEpochException: Producer attempted to produce with an 
old epoch. (org.apache.kafka.clients.producer.internals.TransactionManager
{code}

The same producer first transists to an abortable state and then to a fatal 
state. Each of this sets the {{sendException}} field {{RecordCollectorImpl}}. 
If the task that causes the abortable error saw the fatal error before 
attempting the operation set by the other task, it would not attempt the 
operation.

But, as I said, I am not 100% sure. 

> Kafka Streams crashes after ProducerFencedException
> ---------------------------------------------------
>
>                 Key: KAFKA-14567
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14567
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Matthias J. Sax
>            Assignee: Kirk True
>            Priority: Blocker
>              Labels: eos
>             Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-0000000005, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
>         at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> Caused by: java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
>         at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
>         ... 6 more
> {quote}
> It seems we try to call `send()` after the producer was fenced. However, 
> after a producer was fenced, we should close all tasks dirty, and try to 
> rejoin the group, and should not call `send()` on the already fenced producer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to