[ https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-16077: ------------------------------------ Priority: Blocker (was: Critical) > Streams fails to close task after restoration when input partitions are > updated > ------------------------------------------------------------------------------- > > Key: KAFKA-16077 > URL: https://issues.apache.org/jira/browse/KAFKA-16077 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.7.0 > Reporter: Lucas Brutschy > Assignee: Lucas Brutschy > Priority: Blocker > > There is a race condition in the state updater that can cause the following: > # We have an active task in the state updater > # We get fenced. We recreate the producer, transactions now uninitialized. > We ask the state updater to give back the task, add a pending action to close > the task clean once it’s handed back > # We get a new assignment with updated input partitions. The task is still > owned by the state updater, so we ask the state updater again to hand it back > and add a pending action to update its input partition > # The task is handed back by the state updater. We update its input > partitions but forget to close it clean (pending action was overwritten) > # Now the task is in an initialized state, but the underlying producer does > not have transactions initialized > This can lead to an exception like this: > {code:java} > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: > Exception caught in process. taskId=1_0, > processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition, > partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: > TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: > Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:847) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1919) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:953) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645){code} > This affects EOSv2 only. -- This message was sent by Atlassian Jira (v8.20.10#820010)