[ https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Roesler resolved KAFKA-9310. --------------------------------- Resolution: Fixed > StreamThread may die from recoverable UnknownProducerId exception > ----------------------------------------------------------------- > > Key: KAFKA-9310 > URL: https://issues.apache.org/jira/browse/KAFKA-9310 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > Fix For: 2.4.1 > > > We attempted to capture and recover from UnknownProducerId exceptions in > KAFKA-9231 , but the exception can still be raised, wrapped in a > KafkaException, and kill the thread. > For example, see the stack trace: > {noformat} > [2019-12-17 00:08:53,064] ERROR > [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] > stream-thread > [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031, > topic=windowed-node-counts, partition=1, offset=354933575, > stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort > sending since an error caught with a previous record (timestamp > 1575857317197) to topic > stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to > org.apache.kafka.common.KafkaException: Cannot perform send because at least > one previous transactional or idempotent request has failed with errors. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69) > at > org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465) > at > org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: org.apache.kafka.common.KafkaException: Cannot perform send > because at least one previous transactional or idempotent request has failed > with errors. > at > org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171) > ... 29 more > Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This > exception is raised by the broker if it could not locate the producer > metadata associated with the producerId in question. This could happen if, > for instance, the producer's records were deleted because their retention > time had elapsed. Once the last records of the producerId are removed, the > producer's metadata is removed from the broker, and future appends by the > producer will return this exception. > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] > Abort sending since an error caught with a previous record (timestamp > 1575857317197) to topic > stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to > org.apache.kafka.common.KafkaException: Cannot perform send because at least > one previous transactional or idempotent request has failed with errors. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69) > at > org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465) > at > org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429) > ... 5 more > Caused by: org.apache.kafka.common.KafkaException: Cannot perform send > because at least one previous transactional or idempotent request has failed > with errors. > at > org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171) > ... 29 more > Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This > exception is raised by the broker if it could not locate the producer > metadata associated with the producerId in question. This could happen if, > for instance, the producer's records were deleted because their retention > time had elapsed. Once the last records of the producerId are removed, the > producer's metadata is removed from the broker, and future appends by the > producer will return this exception. > [2019-12-17 00:08:53,066] INFO > [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] > stream-thread > [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State > transition from RUNNING to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > {noformat} > The catch blocks should be updated to expect the exception in this form. -- This message was sent by Atlassian Jira (v8.3.4#803005)