[
https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Roesler resolved KAFKA-9310.
---------------------------------
Resolution: Fixed
> StreamThread may die from recoverable UnknownProducerId exception
> -----------------------------------------------------------------
>
> Key: KAFKA-9310
> URL: https://issues.apache.org/jira/browse/KAFKA-9310
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.0
> Reporter: John Roesler
> Assignee: John Roesler
> Priority: Major
> Fix For: 2.4.1
>
>
> We attempted to capture and recover from UnknownProducerId exceptions in
> KAFKA-9231 , but the exception can still be raised, wrapped in a
> KafkaException, and kill the thread.
> For example, see the stack trace:
> {noformat}
> [2019-12-17 00:08:53,064] ERROR
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3]
> stream-thread
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3]
> Encountered the following unexpected Kafka exception during processing, this
> usually indicate Streams internal errors:
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031,
> topic=windowed-node-counts, partition=1, offset=354933575,
> stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort
> sending since an error caught with a previous record (timestamp
> 1575857317197) to topic
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to
> org.apache.kafka.common.KafkaException: Cannot perform send because at least
> one previous transactional or idempotent request has failed with errors.
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
> at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send
> because at least one previous transactional or idempotent request has failed
> with errors.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
> ... 29 more
> Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This
> exception is raised by the broker if it could not locate the producer
> metadata associated with the producerId in question. This could happen if,
> for instance, the producer's records were deleted because their retention
> time had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1]
> Abort sending since an error caught with a previous record (timestamp
> 1575857317197) to topic
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to
> org.apache.kafka.common.KafkaException: Cannot perform send because at least
> one previous transactional or idempotent request has failed with errors.
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
> at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
> at
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
> ... 5 more
> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send
> because at least one previous transactional or idempotent request has failed
> with errors.
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
> ... 29 more
> Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This
> exception is raised by the broker if it could not locate the producer
> metadata associated with the producerId in question. This could happen if,
> for instance, the producer's records were deleted because their retention
> time had elapsed. Once the last records of the producerId are removed, the
> producer's metadata is removed from the broker, and future appends by the
> producer will return this exception.
> [2019-12-17 00:08:53,066] INFO
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3]
> stream-thread
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State
> transition from RUNNING to PENDING_SHUTDOWN
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {noformat}
> The catch blocks should be updated to expect the exception in this form.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)