[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344537#comment-17344537
 ] 

Jørgen commented on KAFKA-12774:
--------------------------------

It actually works as expected when throwing from selectKey 👍

So it seems like a real cornercase. Here is the log when throwing from 
selectKey (formatted as json as expected):
{code:java}
{"instant":{"epochSecond":1620990806,"nanoOfSecond":307831000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"INFO","loggerName":"org.example.kafka.KafkaConfig","message":"Peeked
 key=key1 
value=value1","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.307+0200"}
{"instant":{"epochSecond":1620990806,"nanoOfSecond":309238000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"ERROR","loggerName":"org.apache.kafka.streams.processor.internals.TaskManager","message":"stream-thread
 [sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1] Failed 
to process stream task 0_0 due to the following 
error:","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught 
in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception
 caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","name":"org.apache.kafka.streams.errors.StreamsException","cause":{"commonElementCount":4,"localizedMessage":"test-exception","message":"test-exception","name":"java.lang.RuntimeException","extendedStackTrace":"java.lang.RuntimeException:
 test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) 
~[main/:?]\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) 
~[main/:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
 
~[kafka-streams-2.8.0.jar:?]\n"},"extendedStackTrace":"org.apache.kafka.streams.errors.StreamsException:
 Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
 [kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
 [kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
 [kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
 [kafka-streams-2.8.0.jar:?]\nCaused by: java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) 
~[main/:?]\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) 
~[main/:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\t... 4 
more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.309+0200"}
{"instant":{"epochSecond":1620990806,"nanoOfSecond":335554000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"WARN","loggerName":"org.example.kafka.KafkaConfig","message":"Uncaught
 exception 
handled","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught 
in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception
 caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","name":"org.apache.kafka.streams.errors.StreamsException","cause":{"commonElementCount":4,"localizedMessage":"test-exception","message":"test-exception","name":"java.lang.RuntimeException","extendedStackTrace":"java.lang.RuntimeException:
 test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) 
~[main/:?]\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) 
~[main/:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
 
~[kafka-streams-2.8.0.jar:?]\n"},"extendedStackTrace":"org.apache.kafka.streams.errors.StreamsException:
 Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
 [kafka-streams-2.8.0.jar:?]\nCaused by: java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) 
~[main/:?]\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) 
~[main/:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\t... 4 
more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.335+0200"}
{"instant":{"epochSecond":1620990806,"nanoOfSecond":376998000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"ERROR","loggerName":"org.apache.zookeeper.server.NIOServerCnxnFactory","message":"Thread
 \tStreamsThread threadId: 
sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1\nTaskManager\n\tMetadataState:\n\tTasks:\n
 died","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught in 
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception
 caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","name":"org.apache.kafka.streams.errors.StreamsException","cause":{"commonElementCount":4,"localizedMessage":"test-exception","message":"test-exception","name":"java.lang.RuntimeException","extendedStackTrace":"java.lang.RuntimeException:
 test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) 
~[main/:?]\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) 
~[main/:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
 
~[kafka-streams-2.8.0.jar:?]\n"},"extendedStackTrace":"org.apache.kafka.streams.errors.StreamsException:
 Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat
 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat
 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat
 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
 ~[kafka-streams-2.8.0.jar:?]\nCaused by: java.lang.RuntimeException: 
test-exception\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) 
~[main/:?]\n\tat 
org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) 
~[main/:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
 ~[kafka-streams-2.8.0.jar:?]\n\tat 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
 ~[kafka-streams-2.8.0.jar:?]\n\t... 4 
more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.376+0200"}
 {code}
I've added a small sample-app replicating application-setup (at least 
framework-wise) in case you want to dig any deeper.

Topology: 
[https://github.com/JorgenRingen/kafka-12774_replicate_log_error/blob/main/src/main/kotlin/org/example/kafka/KafkaConfig.kt#L31]

Test: 
https://github.com/JorgenRingen/kafka-12774_replicate_log_error/blob/main/src/test/kotlin/org/example/kafka/embedded/ExampleEmbeddedKafkaTest.kt#L24

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-12774
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12774
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Jørgen
>            Priority: Minor
>             Fix For: 3.0.0, 2.8.1
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> <Appenders>
>     <Console name="Console" target="SYSTEM_OUT">
>         <JSONLayout complete="false" compact="true" eventEol="true" 
> stacktraceAsString="true" properties="true">
>             <KeyValuePair key="timestamp" 
> value="$${date:yyyy-MM-dd'T'HH:mm:ss.SSSZ}"/>
>         </JSONLayout>
>     </Console>
> </Appenders> {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
>     logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
>     
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>        at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
>     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>      at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>      at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)    
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to