loicgreffier commented on PR #16093:
URL: https://github.com/apache/kafka/pull/16093#issuecomment-2197739588

   @cadonna PR updated. `FailedProcessingException` has been introduced.
   
   Note that:
   ➡️ 1. By default, `FailedProcessingException` appears in the logs:
   
   ```console
   2024-06-28T23:03:34.862+02:00 ERROR 31724 --- [-StreamThread-1] 
org.apache.kafka.streams.KafkaStreams    : stream-client [streams-map] 
Encountered the following exception during processing and sent shutdown request 
for the entire application.
   
   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, 
partition=0, offset=0, 
stacktrace=org.apache.kafka.streams.processor.internals.FailedProcessingException:
 java.lang.RuntimeException: Something bad happened...
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
        at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
        at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
   Caused by: java.lang.RuntimeException: Something bad happened...
        at 
com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
        at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
        ... 26 more
   
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:804)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
   Caused by: 
org.apache.kafka.streams.processor.internals.FailedProcessingException: 
java.lang.RuntimeException: Something bad happened...
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        ... 6 common frames omitted
   Caused by: java.lang.RuntimeException: Something bad happened...
        at 
com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
 ~[classes/:na]
        at 
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
 ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
        ... 26 common frames omitted
   ```
   
   In case of `FailedProcessingException`, the cause is used to create the 
`StreamsException` so the stack trace stays the same before and after the PR:
   
   ```java
   Throwable processingException = e instanceof FailedProcessingException ? 
e.getCause() : e;
   
   final StreamsException error = new StreamsException(
       String.format(
           "Exception caught in process. taskId=%s, processor=%s, topic=%s, 
partition=%d, offset=%d, stacktrace=%s",
           id(),
           processorContext.currentNode().name(),
           record.topic(),
           record.partition(),
           record.offset(),
           getStacktraceString(processingException)
       ),
       processingException
   ```
   
   --- 
   
   ➡️ 2. The processing exception handler logs:
   
   ```console
   2024-06-28T23:03:34.743+02:00  WARN 31724 --- [-StreamThread-1] 
s.e.LogAndFailProcessingExceptionHandler : Exception caught during message 
processing, processor node: KSTREAM-MAPVALUES-0000000003, taskId: 0_0, source 
topic: PERSON_TOPIC, source partition: 0, source offset: 0
   ```
   
   While le `TaskExecutor` logs:
   
   ```console
   2024-06-28T23:58:14.337+02:00 ERROR 28932 --- [-StreamThread-1] 
org.apache.kafka.streams.KafkaStreams    : stream-client [streams-map] 
Encountered the following exception during processing and sent shutdown request 
for the entire application.
   
   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, 
partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad 
happened...
   ```
   
   Both log that an exception occurred in different processors.
   
   Maybe this could be improved (or not) in another PR. The 
`FailedProcessingException` could be used to pass the precise processor node 
name where the exception occurred to the `StreamTask#process`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to