loicgreffier commented on PR #16093: URL: https://github.com/apache/kafka/pull/16093#issuecomment-2197739588
@cadonna PR updated. `FailedProcessingException` has been introduced. Note that: ➡️ 1. By default, `FailedProcessingException` appears in the logs: ```console 2024-06-28T23:03:34.862+02:00 ERROR 31724 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [streams-map] Encountered the following exception during processing and sent shutdown request for the entire application. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=org.apache.kafka.streams.processor.internals.FailedProcessingException: java.lang.RuntimeException: Something bad happened... at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982) at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) Caused by: java.lang.RuntimeException: Something bad happened... at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ... 26 more at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:804) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] Caused by: org.apache.kafka.streams.processor.internals.FailedProcessingException: java.lang.RuntimeException: Something bad happened... at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] ... 6 common frames omitted Caused by: java.lang.RuntimeException: Something bad happened... at com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32) ~[classes/:na] at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na] ... 26 common frames omitted ``` In case of `FailedProcessingException`, the cause is used to create the `StreamsException` so the stack trace stays the same before and after the PR: ```java Throwable processingException = e instanceof FailedProcessingException ? e.getCause() : e; final StreamsException error = new StreamsException( String.format( "Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", id(), processorContext.currentNode().name(), record.topic(), record.partition(), record.offset(), getStacktraceString(processingException) ), processingException ``` --- ➡️ 2. The processing exception handler logs: ```console 2024-06-28T23:03:34.743+02:00 WARN 31724 --- [-StreamThread-1] s.e.LogAndFailProcessingExceptionHandler : Exception caught during message processing, processor node: KSTREAM-MAPVALUES-0000000003, taskId: 0_0, source topic: PERSON_TOPIC, source partition: 0, source offset: 0 ``` While le `TaskExecutor` logs: ```console 2024-06-28T23:58:14.337+02:00 ERROR 28932 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [streams-map] Encountered the following exception during processing and sent shutdown request for the entire application. org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad happened... ``` Both log that an exception occurred in different processors. Maybe this could be improved (or not) in another PR. The `FailedProcessingException` could be used to pass the precise processor node name where the exception occurred to the `StreamTask#process`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org