This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new de39c768df8 KAFKA-19930 Updating doc for process exceptional handler 
in case of global threads (#21109)
de39c768df8 is described below

commit de39c768df889f385804c47595a594a8acad59ab
Author: Arpit Goyal <[email protected]>
AuthorDate: Thu Dec 11 10:12:26 2025 +0530

    KAFKA-19930 Updating doc for process exceptional handler in case of global 
threads (#21109)
    
    Cherrypicking from this https://github.com/apache/kafka/pull/21016
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 docs/streams/developer-guide/config-streams.html       |  5 ++++-
 .../java/org/apache/kafka/streams/StreamsConfig.java   |  4 +++-
 .../streams/processor/internals/ProcessorNode.java     |  9 +++++++++
 .../streams/processor/internals/ProcessorNodeTest.java | 18 ++++++++++++++++++
 4 files changed, 34 insertions(+), 2 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index 8c45d0c7976..cd9c615ffa6 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -1031,7 +1031,10 @@ rack.aware.assignment.tags: zone,cluster   | 
rack.aware.assignment.tags: zone,cl
             <div><p>The processing exception handler allows you to manage 
exceptions triggered during the processing of a record. The implemented 
exception
               handler needs to return a <code>FAIL</code> or 
<code>CONTINUE</code> depending on the record and the exception thrown. 
Returning
               <code>FAIL</code> will signal that Streams should shut down and 
<code>CONTINUE</code> will signal that Streams should ignore the issue
-              and continue processing. The following library built-in 
exception handlers are available:</p>
+              and continue processing.</p>
+              <p><strong>Note:</strong> This handler applies only to regular 
stream processing tasks. It does not apply to global state store updates
+              (global threads). Exceptions occurring in global threads will 
bubble up to the configured uncaught exception handler.</p>
+              <p>The following library built-in exception handlers are 
available:</p>
               <ul class="simple">
                 <li><a class="reference external" 
href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html">LogAndContinueProcessingExceptionHandler</a>:
                   This handler logs the processing exception and then signals 
the processing pipeline to continue processing more records.
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 944446b4bec..9357fc6d395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -695,7 +695,9 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = 
"processing.exception.handler";
     @Deprecated
-    public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = 
"Exception handling class that implements the 
<code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> 
interface.";
+    public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = 
"Exception handling class that implements the 
<code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> 
interface. " +
+            "Note: This handler applies only to regular stream processing 
tasks. It does not apply to global state store updates (global threads). " +
+            "Exceptions occurring in global threads will bubble up to the 
configured uncaught exception handler.";
 
     /** {@code processing.guarantee} */
     @SuppressWarnings("WeakerAccess")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 62173e807fd..955298a2eeb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -207,6 +207,15 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             // while Java distinguishes checked vs unchecked exceptions, other 
languages
             // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
             // (instead of `RuntimeException`) to work well with those 
languages
+
+            // If the processing exception handler is not set (e.g., for 
global threads),
+            // rethrow the exception to let it bubble up to the uncaught 
exception handler.
+            // The processing exception handler is only set for regular stream 
tasks, not for
+            // global state update tasks which use a different error handling 
mechanism.
+            if (processingExceptionHandler == null) {
+                throw processingException;
+            }
+
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 null, // only required to pass for 
DeserializationExceptionHandler
                 internalProcessorContext.topic(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 3a139b3233c..36c46c4b16c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -125,6 +125,24 @@ public class ProcessorNodeTest {
         assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, 
TIMESTAMP)));
     }
 
+    @Test
+    public void shouldRethrowExceptionWhenProcessingExceptionHandlerIsNull() {
+        // This simulates the global thread case where no 
ProcessingExceptionHandler is set
+        final ProcessorNode<Object, Object, Object, Object> node =
+            new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        // Initialize without a ProcessingExceptionHandler (simulates global 
thread initialization)
+        node.init(internalProcessorContext);
+
+        // The exception should be rethrown since there's no handler to 
process it
+        final RuntimeException exception = assertThrows(RuntimeException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+            exception.getMessage());
+    }
+
     @ParameterizedTest
     @CsvSource({
         "FailedProcessingException,java.lang.RuntimeException,Fail processing",

Reply via email to