This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09be14bb09d KAFKA-16448: Fix processing exception handler (#16663)
09be14bb09d is described below

commit 09be14bb09dc336f941a7859232094bfb3cb3b96
Author: Sebastien Viale <sebastien.vi...@michelin.com>
AuthorDate: Fri Jul 26 01:17:31 2024 +0200

    KAFKA-16448: Fix processing exception handler (#16663)
    
    Co-authored-by: Dabz <d.gaspar...@gmail.com>
    Co-authored-by: loicgreffier <loic.greff...@michelin.com>
    
    Minor code improvements across different classed, related to the 
`ProcessingExceptionHandler` implementation (KIP-1033).
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../LogAndFailProcessingExceptionHandler.java      |  2 +-
 .../internals/DefaultErrorHandlerContext.java      | 16 ++++-----
 .../internals/FailedProcessingException.java       |  8 ++---
 .../streams/processor/internals/StreamTask.java    | 42 ++++++++++++----------
 4 files changed, 36 insertions(+), 32 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
index 47fdb09c9c2..9c2cf91c605 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
@@ -32,7 +32,7 @@ public class LogAndFailProcessingExceptionHandler implements 
ProcessingException
 
     @Override
     public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
-        log.warn("Exception caught during message processing, " +
+        log.error("Exception caught during message processing, " +
                 "processor node: {}, taskId: {}, source topic: {}, source 
partition: {}, source offset: {}",
             context.processorNodeId(), context.taskId(), context.topic(), 
context.partition(), context.offset(),
             exception);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
index fc6b6048cb9..ff79860d77e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java
@@ -53,41 +53,41 @@ public class DefaultErrorHandlerContext implements 
ErrorHandlerContext {
 
     @Override
     public String topic() {
-        return this.topic;
+        return topic;
     }
 
     @Override
     public int partition() {
-        return this.partition;
+        return partition;
     }
 
     @Override
     public long offset() {
-        return this.offset;
+        return offset;
     }
 
     @Override
     public Headers headers() {
-        return this.headers;
+        return headers;
     }
 
     @Override
     public byte[] sourceRawKey() {
-        return this.sourceRawKey;
+        return sourceRawKey;
     }
 
     @Override
     public byte[] sourceRawValue() {
-        return this.sourceRawValue;
+        return sourceRawValue;
     }
 
     @Override
     public String processorNodeId() {
-        return this.processorNodeId;
+        return processorNodeId;
     }
 
     @Override
     public TaskId taskId() {
-        return this.taskId;
+        return taskId;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java
index 81b2a2d4fb1..25f2ae9f6cc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.streams.errors.internals;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.errors.StreamsException;
 
 /**
  * {@link FailedProcessingException} is the top-level exception type generated 
by Kafka Streams, and indicates errors have
  * occurred during a {@link 
org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} 
processing.
  */
-public class FailedProcessingException extends KafkaException {
+public class FailedProcessingException extends StreamsException {
     private static final long serialVersionUID = 1L;
 
-    public FailedProcessingException(final Throwable throwable) {
-        super(throwable);
+    public FailedProcessingException(final Exception exception) {
+        super(exception);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 52d40a34915..30b9038aa6a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -800,28 +800,14 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
                 record = null;
                 throw new TaskCorruptedException(Collections.singleton(id));
             }
+        } catch (final FailedProcessingException failedProcessingException) {
+            // Do not keep the failed processing exception in the stack trace
+            handleException(failedProcessingException.getCause());
         } catch (final StreamsException exception) {
             record = null;
             throw exception;
-        } catch (final RuntimeException e) {
-            // Do not keep the failed processing exception in the stack trace
-            final Throwable processingException = e instanceof 
FailedProcessingException ? e.getCause() : e;
-
-            final StreamsException error = new StreamsException(
-                String.format(
-                    "Exception caught in process. taskId=%s, processor=%s, 
topic=%s, partition=%d, offset=%d, stacktrace=%s",
-                    id(),
-                    processorContext.currentNode().name(),
-                    record.topic(),
-                    record.partition(),
-                    record.offset(),
-                    getStacktraceString(processingException)
-                ),
-                processingException
-            );
-            record = null;
-
-            throw error;
+        } catch (final Exception e) {
+            handleException(e);
         } finally {
             processorContext.setCurrentNode(null);
         }
@@ -829,6 +815,24 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         return true;
     }
 
+    private void handleException(final Throwable e) {
+        final StreamsException error = new StreamsException(
+            String.format(
+                "Exception caught in process. taskId=%s, processor=%s, 
topic=%s, partition=%d, offset=%d, stacktrace=%s",
+                id(),
+                processorContext.currentNode().name(),
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                getStacktraceString(e)
+            ),
+            e
+        );
+        record = null;
+
+        throw error;
+    }
+
     @SuppressWarnings("unchecked")
     private void doProcess(final long wallClockTime) {
         // process the record by passing to the source node of the topology

Reply via email to