Thanks, Matthias, for the valuable input.
Yes, it may crash, depending on the existing handler implementation, or
even lead to false information if alerting/logging is implemented in the
handler.
But what I feel is that even if the handler crashes in the new
implementation, we are no worse off than the current state, where
GlobalKTable exceptions already crash or shut down the application.

Thanks and Regards
Arpit Goyal
8861094754


On Wed, Feb 11, 2026 at 1:39 PM Matthias J. Sax <[email protected]> wrote:

> Arpit,
>
> I was just re-reading the KIP, and I am wondering if the proposed change
> is really fully backward compatible? The KIP says:
>
> > This change is fully backward compatible:
> >   - Applications without a configured handler experience no behavior
> change
> >   - Applications with a configured handler automatically get
> GlobalKTable support
>
> The second point seems to be a change in behavior though? In the very
> extreme case, the handler code might not be prepared for this case, and
> could crash, or cause undesired side-effects?
>
> So I am wondering, if we would actually need a config which by default
> would keep this new feature disabled, and user would actively need to
> change the config to enable it?
>
> If we go this route, we could also immediately _deprecate_ this new
> config (yes, it's a little odd to add a new config an deprecate it right
> away, but we have done this in the past), informing users that with
> Kafka Streams 5.0, the handler will be invoked for "global Processor"
> errors by default (and it's not possible to turn it off any longer).
>
>
> Maybe I am too worried here, but based on experience, users do many
> unexpected things, and to guard ourselves against surprises, it might be
> the safer option, to play it conservative?
>
>
> Thoughts? -- Before you make any changes to the KIP, let's hear from
> others. I guess I can also be convinces that the propose change is safe
> enough as-is, and introducing a config would be overkill...
>
>
>
> -Matthias
>
>
>
> On 2/2/26 9:04 AM, Arpit Goyal wrote:
> > Hi Everyone
> > I initiated the voting thread for this KIP.
> >
> > Thanks and Regards
> > Arpit Goyal
> > 8861094754
> >
> > On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <[email protected]>
> > wrote:
> >
> >> Thanks everyone for the input. Should I start voting on it ?
> >>
> >> Thanks and Regards
> >> Arpit Goyal
> >> 8861094754
> >>
> >> On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]>
> >> wrote:
> >>
> >>> Thanks Matthias.
> >>> That makes sense. Client can use  the single handler implementation to
> >>> support error handling for both Stream Thread and Global thread. There
> is
> >>> no need to introduce ThreadType parameter or another configuration for
> the
> >>> same.
> >>> @Lucas Brutschy <[email protected]>  It must answered your query
> ?
> >>>
> >>> Thanks and Regards
> >>> Arpit Goyal
> >>> 8861094754
> >>>
> >>>
> >>> On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]>
> >>> wrote:
> >>>
> >>>> I don't think we would need multiple handlers. The handler is invoked
> >>>> passing in `ErrorHandlerContext` parameter, which provides enough
> >>>> information to distinguish the case (eg, topic(), processorNodeId(),
> and
> >>>> taskId()), so users can implement different logic inside the same
> >>>> handler for the different cases if necessary.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 1/23/26 10:05 AM, Arpit Goyal wrote:
> >>>>> Thanks Bill and Lucas for the feedback.
> >>>>>
> >>>>> LB1:  I was wondering precisely what we are logging in the DLQ case.
> Do
> >>>>>            you intend to log the full record content to make he
> record
> >>>> content
> >>>>>            recoverable, or only some metadata. I suppose it's the
> >>>> latter.
> >>>>>
> >>>>>
> >>>>>     >   Since GlobalKTable lacks producer infrastructure, DLQ records
> >>>> will be
> >>>>> logged with full metadata but NOT sent to a Kafka topic
> >>>>> LB2:   Maybe I missed it (also not super fluent in that part of the
> >>>>>             code), but will the implementers of the
> >>>>> `ProcessExceptionalHandler` be
> >>>>>             able to tell whether the error originated from a
> >>>> globalThread or a
> >>>>>            StreamsThread? The implementer may want to specialize
> >>>> handling for
> >>>>>            each case. This is not a must, but would be a nice to have
> >>>> for
> >>>>> sure.
> >>>>>
> >>>>>    >.   Great question! We have two options here
> >>>>>             Option 1: Single Handler Configuration
> >>>>>
> >>>>>
> >>>>>                    Users define one implementation of
> >>>>> ProcessingExceptionHandler that handles errors for all stream types
> >>>>> (KStream, KTable, and GlobalKTable). This maintains
> >>>>>                     consistency with the existing
> >>>>> DeserializationExceptionHandler pattern.
> >>>>>            Limitation: This will enforce the same handling behavior
> for
> >>>>> global exception handling that we defined for KStream processing
> >>>> exception
> >>>>> handling. This keeps things
> >>>>>                              simple but is not flexible enough for
> >>>> users who
> >>>>> may want different behavior for GlobalKTable.
> >>>>>              Option 2: Separate Optional Configuration
> >>>>>
> >>>>>
> >>>>>                              Introduce a new optional configuration:
> >>>>> global.processing.exception.handler. If configured, it applies
> >>>> specifically
> >>>>> to GlobalKTable processing errors; if not
> >>>>>                              configured, exceptions bubble up to the
> >>>> uncaught
> >>>>> exception handler (maintaining current behavior and backward
> >>>>> compatibility).
> >>>>>               Limitation: Requires two configuration properties if
> >>>> users want
> >>>>> exception handling for both regular streams and GlobalKTable.
> >>>>>
> >>>>>              With Option 1 -  ProcessExceptionalHandler does not
> have a
> >>>> way
> >>>>> to identify which thread is invoking it as of now. We may need to
> >>>> introduce
> >>>>> ThreadType(Stream or Global)  in errorHandlerContext with ThreadType
> >>>>> information.
> >>>>>              With Option 2 - Client would always be aware of the
> class
> >>>> it
> >>>>> has  implemented for GlobalKTables.
> >>>>>
> >>>>> Thanks and Regards
> >>>>> Arpit Goyal
> >>>>> 8861094754
> >>>>>
> >>>>>
> >>>>> On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]>
> >>>> wrote:
> >>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> Thanks for the KIP! Makes sense to me and helps make KS more robust.
> >>>>>> I don't have any additional comments beyond what's been said so far.
> >>>>>>
> >>>>>> -Bill
> >>>>>>
> >>>>>> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
> >>>>>> [email protected]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Overall, this makes sense to me. Thanks for the KIP!
> >>>>>>>
> >>>>>>> LB1: I was wondering precisely what we are logging in the DLQ case.
> >>>> Do
> >>>>>>> you intend to log the full record content to make he record content
> >>>>>>> recoverable, or only some metadata. I suppose it's the latter.
> >>>>>>>
> >>>>>>> LB2: Maybe I missed it (also not super fluent in that part of the
> >>>>>>> code), but will the implementers of the `ProcessExceptionalHandler`
> >>>> be
> >>>>>>> able to tell whether the error originated from a globalThread or a
> >>>>>>> StreamsThread? The implementer may want to specialize handling for
> >>>>>>> each case. This is not a must, but would be a nice to have for
> sure.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Lucas
> >>>>>>>
> >>>>>>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <
> >>>> [email protected]>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi All,
> >>>>>>>> Looking for more inputs and feedback. This would help to move this
> >>>> KIP
> >>>>>>>> forward.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks and Regards
> >>>>>>>> Arpit Goyal
> >>>>>>>> 8861094754
> >>>>>>>>
> >>>>>>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <
> >>>> [email protected]>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for the response Matthias.
> >>>>>>>>> I have updated the KIP to include KIP-1034 handleError()
> automatic
> >>>>>>>>> backward compatibility. DLQ part I already mentioned under the
> >>>>>>> Limitation
> >>>>>>>>> section. Let me know if it needs to be improved further.
> >>>>>>>>> Thanks and Regards
> >>>>>>>>> Arpit Goyal
> >>>>>>>>> 8861094754
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <
> [email protected]>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the clarification. Make sense to me.
> >>>>>>>>>>
> >>>>>>>>>> Might be good to add some of these details (no code reference to
> >>>>>>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP.
> Ie,
> >>>>>>> state
> >>>>>>>>>> explicitly that the new handleError() will be used and that it
> >>>>>>> provides
> >>>>>>>>>> backward compatibility automatically based on it's current
> >>>>>>> implementaion
> >>>>>>>>>> from KIP-1034.
> >>>>>>>>>>
> >>>>>>>>>> And that DLQ records, if returned, would be ignored and dropped
> >>>> and
> >>>>>> a
> >>>>>>>>>> warning is logged about it for this case.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote:
> >>>>>>>>>>> Thank you for the detailed questions! Let me clarify the
> >>>>>>> implementation
> >>>>>>>>>>> approach:
> >>>>>>>>>>>
> >>>>>>>>>>>      Which Method Will Be Called?
> >>>>>>>>>>>
> >>>>>>>>>>>      GlobalThread will call the NEW handleError() method (not
> the
> >>>>>>>>>> deprecated
> >>>>>>>>>>> handle()).
> >>>>>>>>>>>
> >>>>>>>>>>>      Key Point: The exception handler is not called directly by
> >>>>>>>>>> GlobalThread.
> >>>>>>>>>>> Instead, it's called by ProcessorNode.process(), which already
> >>>>>>> invokes
> >>>>>>>>>>> handleError() for regular processors.
> >>>>>>>>>>>
> >>>>>>>>>>>      The implementation is straightforward:
> >>>>>>>>>>>
> >>>>>>>>>>>      Current code (GlobalStateUpdateTask.initTopology - line
> 161):
> >>>>>>>>>>>      node.init((InternalProcessorContext)
> this.processorContext);
> >>>>>> //
> >>>>>>> No
> >>>>>>>>>>> handler passed
> >>>>>>>>>>>
> >>>>>>>>>>>      Proposed change:
> >>>>>>>>>>>      node.init((InternalProcessorContext)
> this.processorContext,
> >>>>>>>>>>> processingExceptionHandler);  // Pass handler
> >>>>>>>>>>>
> >>>>>>>>>>>      Once the handler is passed to ProcessorNode, the same code
> >>>> path
> >>>>>>> that
> >>>>>>>>>>> handles exceptions for regular KStream/KTable processors
> >>>>>>>>>>> (ProcessorNode.process() line 236) will automatically handle
> >>>>>>>>>> GlobalKTable
> >>>>>>>>>>> exceptions:
> >>>>>>>>>>>
> >>>>>>>>>>>      Response response =
> >>>>>>>>>>> processingExceptionHandler.handleError(errorHandlerContext,
> >>>>>> record,
> >>>>>>>>>>> exception);
> >>>>>>>>>>>
> >>>>>>>>>>>      There's no separate code path for GlobalThread - it reuses
> >>>> the
> >>>>>>>>>> existing
> >>>>>>>>>>> ProcessorNode exception handling mechanism.
> >>>>>>>>>>>
> >>>>>>>>>>>      Backward Compatibility
> >>>>>>>>>>>
> >>>>>>>>>>>      The handleError() method provides automatic backward
> >>>>>>> compatibility
> >>>>>>>>>> via
> >>>>>>>>>>> its default implementation:
> >>>>>>>>>>>
> >>>>>>>>>>>      default Response handleError(...) {
> >>>>>>>>>>>          return new Response(Result.from(handle(...)),
> >>>>>>>>>>> Collections.emptyList());
> >>>>>>>>>>>      }
> >>>>>>>>>>>
> >>>>>>>>>>>      - If users implement the old handle() method:
> handleError()
> >>>>>>>>>> delegates to
> >>>>>>>>>>> it automatically
> >>>>>>>>>>>      - If users implement the new handleError() method: it's
> used
> >>>>>>> directly
> >>>>>>>>>>>      - No code changes required for existing applications
> >>>>>>>>>>>
> >>>>>>>>>>>      Dead Letter Queue (DLQ) Support
> >>>>>>>>>>>
> >>>>>>>>>>>      This is where GlobalKTable differs from regular
> processors:
> >>>>>>>>>>>
> >>>>>>>>>>>      The Limitation: GlobalThread does not have a Producer, so
> it
> >>>>>>> cannot
> >>>>>>>>>> send
> >>>>>>>>>>> DLQ records to Kafka.
> >>>>>>>>>>>
> >>>>>>>>>>>      Proposed Approach:
> >>>>>>>>>>>
> >>>>>>>>>>>      1. For KIP-1270: When ProcessorNode detects DLQ records
> but
> >>>> the
> >>>>>>>>>> context
> >>>>>>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will
> log
> >>>>>> a
> >>>>>>>>>> warning
> >>>>>>>>>>> instead of sending:
> >>>>>>>>>>>
> >>>>>>>>>>>      log.warn("Dead letter queue records cannot be sent for
> >>>>>>> GlobalKTable
> >>>>>>>>>>> processors " +
> >>>>>>>>>>>               "(no producer available). DLQ support for
> >>>> GlobalKTable
> >>>>>>> will
> >>>>>>>>>> be
> >>>>>>>>>>> addressed in a future KIP. " +
> >>>>>>>>>>>               "Record details logged: topic={}, headers={}",
> ...);
> >>>>>>>>>>>
> >>>>>>>>>>>      2. Future KIP: Full DLQ support for GlobalKTable (requires
> >>>>>> adding
> >>>>>>>>>>> Producer infrastructure) will be proposed separately, as it's a
> >>>>>>> larger
> >>>>>>>>>>> architectural change.
> >>>>>>>>>>>
> >>>>>>>>>>>      How This Avoids User Confusion
> >>>>>>>>>>>
> >>>>>>>>>>>      1. Single handler for all processors: Users configure ONE
> >>>>>>>>>>> ProcessingExceptionHandler that works for both regular and
> global
> >>>>>>>>>> processors
> >>>>>>>>>>>      2. Consistent behavior: Result.RESUME continues,
> Result.FAIL
> >>>>>>> stops -
> >>>>>>>>>> same
> >>>>>>>>>>> for both
> >>>>>>>>>>>      3. Clear limitation: DLQ records are logged (not sent) for
> >>>>>>>>>> GlobalKTable,
> >>>>>>>>>>> with explicit warning message
> >>>>>>>>>>>      4. Documentation: Config docs will clearly state DLQ
> sending
> >>>>>>>>>> limitation
> >>>>>>>>>>> for GlobalKTable
> >>>>>>>>>>> Thanks and Regards
> >>>>>>>>>>> Arpit Goyal
> >>>>>>>>>>> 8861094754
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <
> >>>> [email protected]
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP Arpit.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Can you elaborate a little bit more on details? With the newly
> >>>>>>> added
> >>>>>>>>>> DLQ
> >>>>>>>>>>>> support for regular `Processor`, the existing
> >>>>>>>>>>>> `ProcessingHandlerResponse` and corresponding method
> `handle()`
> >>>>>> are
> >>>>>>>>>>>> deprecated with upcoming 4.2 release.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not
> >>>>>>> implement
> >>>>>>>>>>>> the old `handle()` (even if it's still supported, as long as
> the
> >>>>>>> new
> >>>>>>>>>>>> `handleError` is not overwritten).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Are you proposing, for now, to only add support for the
> >>>>>> deprecated
> >>>>>>>>>>>> `handle()` method, ie, the new `handleError()` method would
> not
> >>>>>> be
> >>>>>>>>>>>> called by the global-thread code? If yes, this might be
> >>>> confusing
> >>>>>>> for
> >>>>>>>>>>>> users?
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you do not propose this, would it imply that the
> >>>> global-thread
> >>>>>>> code
> >>>>>>>>>>>> would call the new `handlerError()` method? For this case, the
> >>>>>>> question
> >>>>>>>>>>>> is what the runtime would do if users try to use the DLQ
> >>>> feature?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Overall, it's unclear to me what you propose in detail and how
> >>>> we
> >>>>>>> can
> >>>>>>>>>>>> avoid to confuse users, keep it backward compatible, and make
> it
> >>>>>>> easy
> >>>>>>>>>> to
> >>>>>>>>>>>> understanding how the handler will work.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote:
> >>>>>>>>>>>>> Hi Team
> >>>>>>>>>>>>> Just reaching out again.Need your inputs to move it forward
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks and Regards
> >>>>>>>>>>>>> Arpit Goyal
> >>>>>>>>>>>>> 8861094754
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
> >>>>>>> [email protected]>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi All,
> >>>>>>>>>>>>>> I would like to start a discussion for KIP-1270.  This KIP
> >>>>>>> extends
> >>>>>>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable
> processors,
> >>>>>>>>>> enabling
> >>>>>>>>>>>>>> consistent exception handling across all stream processing
> >>>>>> types.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> * Current Behavior*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       When a processing exception occurs in a GlobalKTable
> >>>>>>> processor:
> >>>>>>>>>>>>>>       - The exception propagates to GlobalStreamThread
> >>>>>>>>>>>>>>       - The GlobalStreamThread terminates
> >>>>>>>>>>>>>>       - The entire Kafka Streams application shuts down
> >>>>>>>>>>>>>>       - No user-configurable exception handling is available
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> *  Proposed Behavior*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       After this KIP, when a processing exception occurs in
> a
> >>>>>>>>>> GlobalKTable
> >>>>>>>>>>>>>> processor:
> >>>>>>>>>>>>>>       - The configured
> ProcessingExceptionHandler.handleError()
> >>>>>>> will be
> >>>>>>>>>>>> invoked
> >>>>>>>>>>>>>>       - If the handler returns Result.RESUME, processing
> >>>>>> continues
> >>>>>>>>>> with the
> >>>>>>>>>>>>>> next record
> >>>>>>>>>>>>>>       - If the handler returns Result.FAIL, the exception
> >>>>>>> propagates
> >>>>>>>>>> (same
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>> current behavior)
> >>>>>>>>>>>>>>       - If no handler is configured, behavior remains
> unchanged
> >>>>>>>>>> (backward
> >>>>>>>>>>>>>> compatible)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> KIP:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks and Regards
> >>>>>>>>>>>>>> Arpit Goyal
> >>>>>>>>>>>>>> 8861094754
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >
>
>

Reply via email to