Hi,

I agree that this change could cause the processing error exception
handler to crash under very specific circumstances. I think it's a
fairly rare case, but since we are raising this as a feature, not as a
bug fix, you are right that we should follow the proposed route of new
deprecated configuration to be on the safe side.

Cheers,
Lucas


On Wed, Feb 11, 2026 at 2:48 PM Arpit Goyal <[email protected]> wrote:
>
> Thanks, Matthias, for the valuable input.
> Yes, it may crash, depending on the existing handler implementation, or
> even lead to false information if alerting/logging is implemented in the
> handler.
> But what I feel is that even if the handler crashes in the new
> implementation, we are no worse off than the current state, where
> GlobalKTable exceptions already crash or shut down the application.
>
> Thanks and Regards
> Arpit Goyal
> 8861094754
>
>
> On Wed, Feb 11, 2026 at 1:39 PM Matthias J. Sax <[email protected]> wrote:
>
> > Arpit,
> >
> > I was just re-reading the KIP, and I am wondering if the proposed change
> > is really fully backward compatible? The KIP says:
> >
> > > This change is fully backward compatible:
> > >   - Applications without a configured handler experience no behavior
> > change
> > >   - Applications with a configured handler automatically get
> > GlobalKTable support
> >
> > The second point seems to be a change in behavior though? In the very
> > extreme case, the handler code might not be prepared for this case, and
> > could crash, or cause undesired side-effects?
> >
> > So I am wondering, if we would actually need a config which by default
> > would keep this new feature disabled, and user would actively need to
> > change the config to enable it?
> >
> > If we go this route, we could also immediately _deprecate_ this new
> > config (yes, it's a little odd to add a new config an deprecate it right
> > away, but we have done this in the past), informing users that with
> > Kafka Streams 5.0, the handler will be invoked for "global Processor"
> > errors by default (and it's not possible to turn it off any longer).
> >
> >
> > Maybe I am too worried here, but based on experience, users do many
> > unexpected things, and to guard ourselves against surprises, it might be
> > the safer option, to play it conservative?
> >
> >
> > Thoughts? -- Before you make any changes to the KIP, let's hear from
> > others. I guess I can also be convinces that the propose change is safe
> > enough as-is, and introducing a config would be overkill...
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 2/2/26 9:04 AM, Arpit Goyal wrote:
> > > Hi Everyone
> > > I initiated the voting thread for this KIP.
> > >
> > > Thanks and Regards
> > > Arpit Goyal
> > > 8861094754
> > >
> > > On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <[email protected]>
> > > wrote:
> > >
> > >> Thanks everyone for the input. Should I start voting on it ?
> > >>
> > >> Thanks and Regards
> > >> Arpit Goyal
> > >> 8861094754
> > >>
> > >> On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]>
> > >> wrote:
> > >>
> > >>> Thanks Matthias.
> > >>> That makes sense. Client can use  the single handler implementation to
> > >>> support error handling for both Stream Thread and Global thread. There
> > is
> > >>> no need to introduce ThreadType parameter or another configuration for
> > the
> > >>> same.
> > >>> @Lucas Brutschy <[email protected]>  It must answered your query
> > ?
> > >>>
> > >>> Thanks and Regards
> > >>> Arpit Goyal
> > >>> 8861094754
> > >>>
> > >>>
> > >>> On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]>
> > >>> wrote:
> > >>>
> > >>>> I don't think we would need multiple handlers. The handler is invoked
> > >>>> passing in `ErrorHandlerContext` parameter, which provides enough
> > >>>> information to distinguish the case (eg, topic(), processorNodeId(),
> > and
> > >>>> taskId()), so users can implement different logic inside the same
> > >>>> handler for the different cases if necessary.
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>> On 1/23/26 10:05 AM, Arpit Goyal wrote:
> > >>>>> Thanks Bill and Lucas for the feedback.
> > >>>>>
> > >>>>> LB1:  I was wondering precisely what we are logging in the DLQ case.
> > Do
> > >>>>>            you intend to log the full record content to make he
> > record
> > >>>> content
> > >>>>>            recoverable, or only some metadata. I suppose it's the
> > >>>> latter.
> > >>>>>
> > >>>>>
> > >>>>>     >   Since GlobalKTable lacks producer infrastructure, DLQ records
> > >>>> will be
> > >>>>> logged with full metadata but NOT sent to a Kafka topic
> > >>>>> LB2:   Maybe I missed it (also not super fluent in that part of the
> > >>>>>             code), but will the implementers of the
> > >>>>> `ProcessExceptionalHandler` be
> > >>>>>             able to tell whether the error originated from a
> > >>>> globalThread or a
> > >>>>>            StreamsThread? The implementer may want to specialize
> > >>>> handling for
> > >>>>>            each case. This is not a must, but would be a nice to have
> > >>>> for
> > >>>>> sure.
> > >>>>>
> > >>>>>    >.   Great question! We have two options here
> > >>>>>             Option 1: Single Handler Configuration
> > >>>>>
> > >>>>>
> > >>>>>                    Users define one implementation of
> > >>>>> ProcessingExceptionHandler that handles errors for all stream types
> > >>>>> (KStream, KTable, and GlobalKTable). This maintains
> > >>>>>                     consistency with the existing
> > >>>>> DeserializationExceptionHandler pattern.
> > >>>>>            Limitation: This will enforce the same handling behavior
> > for
> > >>>>> global exception handling that we defined for KStream processing
> > >>>> exception
> > >>>>> handling. This keeps things
> > >>>>>                              simple but is not flexible enough for
> > >>>> users who
> > >>>>> may want different behavior for GlobalKTable.
> > >>>>>              Option 2: Separate Optional Configuration
> > >>>>>
> > >>>>>
> > >>>>>                              Introduce a new optional configuration:
> > >>>>> global.processing.exception.handler. If configured, it applies
> > >>>> specifically
> > >>>>> to GlobalKTable processing errors; if not
> > >>>>>                              configured, exceptions bubble up to the
> > >>>> uncaught
> > >>>>> exception handler (maintaining current behavior and backward
> > >>>>> compatibility).
> > >>>>>               Limitation: Requires two configuration properties if
> > >>>> users want
> > >>>>> exception handling for both regular streams and GlobalKTable.
> > >>>>>
> > >>>>>              With Option 1 -  ProcessExceptionalHandler does not
> > have a
> > >>>> way
> > >>>>> to identify which thread is invoking it as of now. We may need to
> > >>>> introduce
> > >>>>> ThreadType(Stream or Global)  in errorHandlerContext with ThreadType
> > >>>>> information.
> > >>>>>              With Option 2 - Client would always be aware of the
> > class
> > >>>> it
> > >>>>> has  implemented for GlobalKTables.
> > >>>>>
> > >>>>> Thanks and Regards
> > >>>>> Arpit Goyal
> > >>>>> 8861094754
> > >>>>>
> > >>>>>
> > >>>>> On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi All,
> > >>>>>>
> > >>>>>> Thanks for the KIP! Makes sense to me and helps make KS more robust.
> > >>>>>> I don't have any additional comments beyond what's been said so far.
> > >>>>>>
> > >>>>>> -Bill
> > >>>>>>
> > >>>>>> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
> > >>>>>> [email protected]>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> Overall, this makes sense to me. Thanks for the KIP!
> > >>>>>>>
> > >>>>>>> LB1: I was wondering precisely what we are logging in the DLQ case.
> > >>>> Do
> > >>>>>>> you intend to log the full record content to make he record content
> > >>>>>>> recoverable, or only some metadata. I suppose it's the latter.
> > >>>>>>>
> > >>>>>>> LB2: Maybe I missed it (also not super fluent in that part of the
> > >>>>>>> code), but will the implementers of the `ProcessExceptionalHandler`
> > >>>> be
> > >>>>>>> able to tell whether the error originated from a globalThread or a
> > >>>>>>> StreamsThread? The implementer may want to specialize handling for
> > >>>>>>> each case. This is not a must, but would be a nice to have for
> > sure.
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Lucas
> > >>>>>>>
> > >>>>>>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <
> > >>>> [email protected]>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi All,
> > >>>>>>>> Looking for more inputs and feedback. This would help to move this
> > >>>> KIP
> > >>>>>>>> forward.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Thanks and Regards
> > >>>>>>>> Arpit Goyal
> > >>>>>>>> 8861094754
> > >>>>>>>>
> > >>>>>>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <
> > >>>> [email protected]>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Thanks for the response Matthias.
> > >>>>>>>>> I have updated the KIP to include KIP-1034 handleError()
> > automatic
> > >>>>>>>>> backward compatibility. DLQ part I already mentioned under the
> > >>>>>>> Limitation
> > >>>>>>>>> section. Let me know if it needs to be improved further.
> > >>>>>>>>> Thanks and Regards
> > >>>>>>>>> Arpit Goyal
> > >>>>>>>>> 8861094754
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <
> > [email protected]>
> > >>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Thanks for the clarification. Make sense to me.
> > >>>>>>>>>>
> > >>>>>>>>>> Might be good to add some of these details (no code reference to
> > >>>>>>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP.
> > Ie,
> > >>>>>>> state
> > >>>>>>>>>> explicitly that the new handleError() will be used and that it
> > >>>>>>> provides
> > >>>>>>>>>> backward compatibility automatically based on it's current
> > >>>>>>> implementaion
> > >>>>>>>>>> from KIP-1034.
> > >>>>>>>>>>
> > >>>>>>>>>> And that DLQ records, if returned, would be ignored and dropped
> > >>>> and
> > >>>>>> a
> > >>>>>>>>>> warning is logged about it for this case.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote:
> > >>>>>>>>>>> Thank you for the detailed questions! Let me clarify the
> > >>>>>>> implementation
> > >>>>>>>>>>> approach:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Which Method Will Be Called?
> > >>>>>>>>>>>
> > >>>>>>>>>>>      GlobalThread will call the NEW handleError() method (not
> > the
> > >>>>>>>>>> deprecated
> > >>>>>>>>>>> handle()).
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Key Point: The exception handler is not called directly by
> > >>>>>>>>>> GlobalThread.
> > >>>>>>>>>>> Instead, it's called by ProcessorNode.process(), which already
> > >>>>>>> invokes
> > >>>>>>>>>>> handleError() for regular processors.
> > >>>>>>>>>>>
> > >>>>>>>>>>>      The implementation is straightforward:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Current code (GlobalStateUpdateTask.initTopology - line
> > 161):
> > >>>>>>>>>>>      node.init((InternalProcessorContext)
> > this.processorContext);
> > >>>>>> //
> > >>>>>>> No
> > >>>>>>>>>>> handler passed
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Proposed change:
> > >>>>>>>>>>>      node.init((InternalProcessorContext)
> > this.processorContext,
> > >>>>>>>>>>> processingExceptionHandler);  // Pass handler
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Once the handler is passed to ProcessorNode, the same code
> > >>>> path
> > >>>>>>> that
> > >>>>>>>>>>> handles exceptions for regular KStream/KTable processors
> > >>>>>>>>>>> (ProcessorNode.process() line 236) will automatically handle
> > >>>>>>>>>> GlobalKTable
> > >>>>>>>>>>> exceptions:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Response response =
> > >>>>>>>>>>> processingExceptionHandler.handleError(errorHandlerContext,
> > >>>>>> record,
> > >>>>>>>>>>> exception);
> > >>>>>>>>>>>
> > >>>>>>>>>>>      There's no separate code path for GlobalThread - it reuses
> > >>>> the
> > >>>>>>>>>> existing
> > >>>>>>>>>>> ProcessorNode exception handling mechanism.
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Backward Compatibility
> > >>>>>>>>>>>
> > >>>>>>>>>>>      The handleError() method provides automatic backward
> > >>>>>>> compatibility
> > >>>>>>>>>> via
> > >>>>>>>>>>> its default implementation:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      default Response handleError(...) {
> > >>>>>>>>>>>          return new Response(Result.from(handle(...)),
> > >>>>>>>>>>> Collections.emptyList());
> > >>>>>>>>>>>      }
> > >>>>>>>>>>>
> > >>>>>>>>>>>      - If users implement the old handle() method:
> > handleError()
> > >>>>>>>>>> delegates to
> > >>>>>>>>>>> it automatically
> > >>>>>>>>>>>      - If users implement the new handleError() method: it's
> > used
> > >>>>>>> directly
> > >>>>>>>>>>>      - No code changes required for existing applications
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Dead Letter Queue (DLQ) Support
> > >>>>>>>>>>>
> > >>>>>>>>>>>      This is where GlobalKTable differs from regular
> > processors:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      The Limitation: GlobalThread does not have a Producer, so
> > it
> > >>>>>>> cannot
> > >>>>>>>>>> send
> > >>>>>>>>>>> DLQ records to Kafka.
> > >>>>>>>>>>>
> > >>>>>>>>>>>      Proposed Approach:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      1. For KIP-1270: When ProcessorNode detects DLQ records
> > but
> > >>>> the
> > >>>>>>>>>> context
> > >>>>>>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will
> > log
> > >>>>>> a
> > >>>>>>>>>> warning
> > >>>>>>>>>>> instead of sending:
> > >>>>>>>>>>>
> > >>>>>>>>>>>      log.warn("Dead letter queue records cannot be sent for
> > >>>>>>> GlobalKTable
> > >>>>>>>>>>> processors " +
> > >>>>>>>>>>>               "(no producer available). DLQ support for
> > >>>> GlobalKTable
> > >>>>>>> will
> > >>>>>>>>>> be
> > >>>>>>>>>>> addressed in a future KIP. " +
> > >>>>>>>>>>>               "Record details logged: topic={}, headers={}",
> > ...);
> > >>>>>>>>>>>
> > >>>>>>>>>>>      2. Future KIP: Full DLQ support for GlobalKTable (requires
> > >>>>>> adding
> > >>>>>>>>>>> Producer infrastructure) will be proposed separately, as it's a
> > >>>>>>> larger
> > >>>>>>>>>>> architectural change.
> > >>>>>>>>>>>
> > >>>>>>>>>>>      How This Avoids User Confusion
> > >>>>>>>>>>>
> > >>>>>>>>>>>      1. Single handler for all processors: Users configure ONE
> > >>>>>>>>>>> ProcessingExceptionHandler that works for both regular and
> > global
> > >>>>>>>>>> processors
> > >>>>>>>>>>>      2. Consistent behavior: Result.RESUME continues,
> > Result.FAIL
> > >>>>>>> stops -
> > >>>>>>>>>> same
> > >>>>>>>>>>> for both
> > >>>>>>>>>>>      3. Clear limitation: DLQ records are logged (not sent) for
> > >>>>>>>>>> GlobalKTable,
> > >>>>>>>>>>> with explicit warning message
> > >>>>>>>>>>>      4. Documentation: Config docs will clearly state DLQ
> > sending
> > >>>>>>>>>> limitation
> > >>>>>>>>>>> for GlobalKTable
> > >>>>>>>>>>> Thanks and Regards
> > >>>>>>>>>>> Arpit Goyal
> > >>>>>>>>>>> 8861094754
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <
> > >>>> [email protected]
> > >>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Thanks for the KIP Arpit.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Can you elaborate a little bit more on details? With the newly
> > >>>>>>> added
> > >>>>>>>>>> DLQ
> > >>>>>>>>>>>> support for regular `Processor`, the existing
> > >>>>>>>>>>>> `ProcessingHandlerResponse` and corresponding method
> > `handle()`
> > >>>>>> are
> > >>>>>>>>>>>> deprecated with upcoming 4.2 release.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not
> > >>>>>>> implement
> > >>>>>>>>>>>> the old `handle()` (even if it's still supported, as long as
> > the
> > >>>>>>> new
> > >>>>>>>>>>>> `handleError` is not overwritten).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Are you proposing, for now, to only add support for the
> > >>>>>> deprecated
> > >>>>>>>>>>>> `handle()` method, ie, the new `handleError()` method would
> > not
> > >>>>>> be
> > >>>>>>>>>>>> called by the global-thread code? If yes, this might be
> > >>>> confusing
> > >>>>>>> for
> > >>>>>>>>>>>> users?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If you do not propose this, would it imply that the
> > >>>> global-thread
> > >>>>>>> code
> > >>>>>>>>>>>> would call the new `handlerError()` method? For this case, the
> > >>>>>>> question
> > >>>>>>>>>>>> is what the runtime would do if users try to use the DLQ
> > >>>> feature?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Overall, it's unclear to me what you propose in detail and how
> > >>>> we
> > >>>>>>> can
> > >>>>>>>>>>>> avoid to confuse users, keep it backward compatible, and make
> > it
> > >>>>>>> easy
> > >>>>>>>>>> to
> > >>>>>>>>>>>> understanding how the handler will work.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> > >>>>>>>>>>>>> Hi Team
> > >>>>>>>>>>>>> Just reaching out again.Need your inputs to move it forward
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks and Regards
> > >>>>>>>>>>>>> Arpit Goyal
> > >>>>>>>>>>>>> 8861094754
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
> > >>>>>>> [email protected]>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi All,
> > >>>>>>>>>>>>>> I would like to start a discussion for KIP-1270.  This KIP
> > >>>>>>> extends
> > >>>>>>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable
> > processors,
> > >>>>>>>>>> enabling
> > >>>>>>>>>>>>>> consistent exception handling across all stream processing
> > >>>>>> types.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> * Current Behavior*
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>       When a processing exception occurs in a GlobalKTable
> > >>>>>>> processor:
> > >>>>>>>>>>>>>>       - The exception propagates to GlobalStreamThread
> > >>>>>>>>>>>>>>       - The GlobalStreamThread terminates
> > >>>>>>>>>>>>>>       - The entire Kafka Streams application shuts down
> > >>>>>>>>>>>>>>       - No user-configurable exception handling is available
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> *  Proposed Behavior*
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>       After this KIP, when a processing exception occurs in
> > a
> > >>>>>>>>>> GlobalKTable
> > >>>>>>>>>>>>>> processor:
> > >>>>>>>>>>>>>>       - The configured
> > ProcessingExceptionHandler.handleError()
> > >>>>>>> will be
> > >>>>>>>>>>>> invoked
> > >>>>>>>>>>>>>>       - If the handler returns Result.RESUME, processing
> > >>>>>> continues
> > >>>>>>>>>> with the
> > >>>>>>>>>>>>>> next record
> > >>>>>>>>>>>>>>       - If the handler returns Result.FAIL, the exception
> > >>>>>>> propagates
> > >>>>>>>>>> (same
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>> current behavior)
> > >>>>>>>>>>>>>>       - If no handler is configured, behavior remains
> > unchanged
> > >>>>>>>>>> (backward
> > >>>>>>>>>>>>>> compatible)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> KIP:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks and Regards
> > >>>>>>>>>>>>>> Arpit Goyal
> > >>>>>>>>>>>>>> 8861094754
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >
> >
> >

Reply via email to