[ 
https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9310.
---------------------------------
    Resolution: Fixed

> StreamThread may die from recoverable UnknownProducerId exception
> -----------------------------------------------------------------
>
>                 Key: KAFKA-9310
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9310
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.4.1
>
>
> We attempted to capture and recover from UnknownProducerId exceptions in 
> KAFKA-9231 , but the exception can still be raised, wrapped in a 
> KafkaException, and kill the thread.
> For example, see the stack trace:
> {noformat}
> [2019-12-17 00:08:53,064] ERROR 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
>   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031, 
> topic=windowed-node-counts, partition=1, offset=354933575, 
> stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
> sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>       at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  Caused by: org.apache.kafka.common.KafkaException: Cannot perform send 
> because at least one previous transactional or idempotent request has failed 
> with errors.
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>       ... 29 more
>  Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
> exception is raised by the broker if it could not locate the producer 
> metadata associated with the producerId in question. This could happen if, 
> for instance, the producer's records were deleted because their retention 
> time had elapsed. Once the last records of the producerId are removed, the 
> producer's metadata is removed from the broker, and future appends by the 
> producer will return this exception.
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] 
> Abort sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>       at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
>       ... 5 more
>  Caused by: org.apache.kafka.common.KafkaException: Cannot perform send 
> because at least one previous transactional or idempotent request has failed 
> with errors.
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>       ... 29 more
>  Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
> exception is raised by the broker if it could not locate the producer 
> metadata associated with the producerId in question. This could happen if, 
> for instance, the producer's records were deleted because their retention 
> time had elapsed. Once the last records of the producerId are removed, the 
> producer's metadata is removed from the broker, and future appends by the 
> producer will return this exception.
>  [2019-12-17 00:08:53,066] INFO 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {noformat}
> The catch blocks should be updated to expect the exception in this form.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to