[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344537#comment-17344537 ]
Jørgen commented on KAFKA-12774: -------------------------------- It actually works as expected when throwing from selectKey 👍 So it seems like a real cornercase. Here is the log when throwing from selectKey (formatted as json as expected): {code:java} {"instant":{"epochSecond":1620990806,"nanoOfSecond":307831000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"INFO","loggerName":"org.example.kafka.KafkaConfig","message":"Peeked key=key1 value=value1","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.307+0200"} {"instant":{"epochSecond":1620990806,"nanoOfSecond":309238000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"ERROR","loggerName":"org.apache.kafka.streams.processor.internals.TaskManager","message":"stream-thread [sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1] Failed to process stream task 0_0 due to the following error:","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","name":"org.apache.kafka.streams.errors.StreamsException","cause":{"commonElementCount":4,"localizedMessage":"test-exception","message":"test-exception","name":"java.lang.RuntimeException","extendedStackTrace":"java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) ~[main/:?]\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) ~[main/:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n"},"extendedStackTrace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) [kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) [kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) [kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) [kafka-streams-2.8.0.jar:?]\nCaused by: java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) ~[main/:?]\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) ~[main/:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\t... 4 more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.309+0200"} {"instant":{"epochSecond":1620990806,"nanoOfSecond":335554000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"WARN","loggerName":"org.example.kafka.KafkaConfig","message":"Uncaught exception handled","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","name":"org.apache.kafka.streams.errors.StreamsException","cause":{"commonElementCount":4,"localizedMessage":"test-exception","message":"test-exception","name":"java.lang.RuntimeException","extendedStackTrace":"java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) ~[main/:?]\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) ~[main/:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n"},"extendedStackTrace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) [kafka-streams-2.8.0.jar:?]\nCaused by: java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) ~[main/:?]\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) ~[main/:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\t... 4 more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.335+0200"} {"instant":{"epochSecond":1620990806,"nanoOfSecond":376998000},"thread":"sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1","level":"ERROR","loggerName":"org.apache.zookeeper.server.NIOServerCnxnFactory","message":"Thread \tStreamsThread threadId: sb-gp-ms-template-a66ab1de-100a-43a3-ba73-1ea077ed95b5-StreamThread-1\nTaskManager\n\tMetadataState:\n\tTasks:\n died","thrown":{"commonElementCount":0,"localizedMessage":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","message":"Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n","name":"org.apache.kafka.streams.errors.StreamsException","cause":{"commonElementCount":4,"localizedMessage":"test-exception","message":"test-exception","name":"java.lang.RuntimeException","extendedStackTrace":"java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) ~[main/:?]\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) ~[main/:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n"},"extendedStackTrace":"org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=foo, partition=0, offset=0, stacktrace=java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31)\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18)\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236)\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)\n\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) ~[kafka-streams-2.8.0.jar:?]\nCaused by: java.lang.RuntimeException: test-exception\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:31) ~[main/:?]\n\tat org.example.kafka.KafkaConfig$topology$2.apply(KafkaConfig.kt:18) ~[main/:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:236) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:?]\n\tat org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731) ~[kafka-streams-2.8.0.jar:?]\n\t... 4 more\n"},"endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":111,"threadPriority":5,"timestamp":"2021-05-14T13:13:26.376+0200"} {code} I've added a small sample-app replicating application-setup (at least framework-wise) in case you want to dig any deeper. Topology: [https://github.com/JorgenRingen/kafka-12774_replicate_log_error/blob/main/src/main/kotlin/org/example/kafka/KafkaConfig.kt#L31] Test: https://github.com/JorgenRingen/kafka-12774_replicate_log_error/blob/main/src/test/kotlin/org/example/kafka/embedded/ExampleEmbeddedKafkaTest.kt#L24 > kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through > log4j > -------------------------------------------------------------------------------- > > Key: KAFKA-12774 > URL: https://issues.apache.org/jira/browse/KAFKA-12774 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Reporter: Jørgen > Priority: Minor > Fix For: 3.0.0, 2.8.1 > > > When exceptions is handled in the uncaught-exception handler introduced in > KS2.8, the logging of the stacktrace doesn't seem to go through the logging > framework configured by the application (log4j2 in our case), but gets > printed to console "line-by-line". > All other exceptions logged by kafka-streams go through log4j2 and gets > formatted properly according to the log4j2 appender (json in our case). > Haven't tested this on other frameworks like logback. > Application setup: > * Spring-boot 2.4.5 > * Log4j 2.13.3 > * Slf4j 1.7.30 > Log4j2 appender config: > {code:java} > <Appenders> > <Console name="Console" target="SYSTEM_OUT"> > <JSONLayout complete="false" compact="true" eventEol="true" > stacktraceAsString="true" properties="true"> > <KeyValuePair key="timestamp" > value="$${date:yyyy-MM-dd'T'HH:mm:ss.SSSZ}"/> > </JSONLayout> > </Console> > </Appenders> {code} > Uncaught exception handler config: > {code:java} > kafkaStreams.setUncaughtExceptionHandler { exception -> > logger.warn("Uncaught exception handled - replacing thread", exception) > // logged properly > > StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD > } {code} > Stacktrace that gets printed line-by-line: > {code:java} > Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic xxx-repartition for task 3_2 due > to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id.Exception handler choose to FAIL the processing, no more > records would be sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242) > at java.base/java.lang.Thread.run(Unknown Source)Caused by: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. {code} > > It's a little bit hard to reproduce as I haven't found any way to trigger > uncaught-exception-handler through junit-tests. > Link to discussion on slack: > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700 -- This message was sent by Atlassian Jira (v8.3.4#803005)