Hi, I agree that this change could cause the processing error exception handler to crash under very specific circumstances. I think it's a fairly rare case, but since we are raising this as a feature, not as a bug fix, you are right that we should follow the proposed route of new deprecated configuration to be on the safe side.
Cheers, Lucas On Wed, Feb 11, 2026 at 2:48 PM Arpit Goyal <[email protected]> wrote: > > Thanks, Matthias, for the valuable input. > Yes, it may crash, depending on the existing handler implementation, or > even lead to false information if alerting/logging is implemented in the > handler. > But what I feel is that even if the handler crashes in the new > implementation, we are no worse off than the current state, where > GlobalKTable exceptions already crash or shut down the application. > > Thanks and Regards > Arpit Goyal > 8861094754 > > > On Wed, Feb 11, 2026 at 1:39 PM Matthias J. Sax <[email protected]> wrote: > > > Arpit, > > > > I was just re-reading the KIP, and I am wondering if the proposed change > > is really fully backward compatible? The KIP says: > > > > > This change is fully backward compatible: > > > - Applications without a configured handler experience no behavior > > change > > > - Applications with a configured handler automatically get > > GlobalKTable support > > > > The second point seems to be a change in behavior though? In the very > > extreme case, the handler code might not be prepared for this case, and > > could crash, or cause undesired side-effects? > > > > So I am wondering, if we would actually need a config which by default > > would keep this new feature disabled, and user would actively need to > > change the config to enable it? > > > > If we go this route, we could also immediately _deprecate_ this new > > config (yes, it's a little odd to add a new config an deprecate it right > > away, but we have done this in the past), informing users that with > > Kafka Streams 5.0, the handler will be invoked for "global Processor" > > errors by default (and it's not possible to turn it off any longer). > > > > > > Maybe I am too worried here, but based on experience, users do many > > unexpected things, and to guard ourselves against surprises, it might be > > the safer option, to play it conservative? > > > > > > Thoughts? -- Before you make any changes to the KIP, let's hear from > > others. I guess I can also be convinces that the propose change is safe > > enough as-is, and introducing a config would be overkill... > > > > > > > > -Matthias > > > > > > > > On 2/2/26 9:04 AM, Arpit Goyal wrote: > > > Hi Everyone > > > I initiated the voting thread for this KIP. > > > > > > Thanks and Regards > > > Arpit Goyal > > > 8861094754 > > > > > > On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <[email protected]> > > > wrote: > > > > > >> Thanks everyone for the input. Should I start voting on it ? > > >> > > >> Thanks and Regards > > >> Arpit Goyal > > >> 8861094754 > > >> > > >> On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]> > > >> wrote: > > >> > > >>> Thanks Matthias. > > >>> That makes sense. Client can use the single handler implementation to > > >>> support error handling for both Stream Thread and Global thread. There > > is > > >>> no need to introduce ThreadType parameter or another configuration for > > the > > >>> same. > > >>> @Lucas Brutschy <[email protected]> It must answered your query > > ? > > >>> > > >>> Thanks and Regards > > >>> Arpit Goyal > > >>> 8861094754 > > >>> > > >>> > > >>> On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]> > > >>> wrote: > > >>> > > >>>> I don't think we would need multiple handlers. The handler is invoked > > >>>> passing in `ErrorHandlerContext` parameter, which provides enough > > >>>> information to distinguish the case (eg, topic(), processorNodeId(), > > and > > >>>> taskId()), so users can implement different logic inside the same > > >>>> handler for the different cases if necessary. > > >>>> > > >>>> > > >>>> -Matthias > > >>>> > > >>>> > > >>>> On 1/23/26 10:05 AM, Arpit Goyal wrote: > > >>>>> Thanks Bill and Lucas for the feedback. > > >>>>> > > >>>>> LB1: I was wondering precisely what we are logging in the DLQ case. > > Do > > >>>>> you intend to log the full record content to make he > > record > > >>>> content > > >>>>> recoverable, or only some metadata. I suppose it's the > > >>>> latter. > > >>>>> > > >>>>> > > >>>>> > Since GlobalKTable lacks producer infrastructure, DLQ records > > >>>> will be > > >>>>> logged with full metadata but NOT sent to a Kafka topic > > >>>>> LB2: Maybe I missed it (also not super fluent in that part of the > > >>>>> code), but will the implementers of the > > >>>>> `ProcessExceptionalHandler` be > > >>>>> able to tell whether the error originated from a > > >>>> globalThread or a > > >>>>> StreamsThread? The implementer may want to specialize > > >>>> handling for > > >>>>> each case. This is not a must, but would be a nice to have > > >>>> for > > >>>>> sure. > > >>>>> > > >>>>> >. Great question! We have two options here > > >>>>> Option 1: Single Handler Configuration > > >>>>> > > >>>>> > > >>>>> Users define one implementation of > > >>>>> ProcessingExceptionHandler that handles errors for all stream types > > >>>>> (KStream, KTable, and GlobalKTable). This maintains > > >>>>> consistency with the existing > > >>>>> DeserializationExceptionHandler pattern. > > >>>>> Limitation: This will enforce the same handling behavior > > for > > >>>>> global exception handling that we defined for KStream processing > > >>>> exception > > >>>>> handling. This keeps things > > >>>>> simple but is not flexible enough for > > >>>> users who > > >>>>> may want different behavior for GlobalKTable. > > >>>>> Option 2: Separate Optional Configuration > > >>>>> > > >>>>> > > >>>>> Introduce a new optional configuration: > > >>>>> global.processing.exception.handler. If configured, it applies > > >>>> specifically > > >>>>> to GlobalKTable processing errors; if not > > >>>>> configured, exceptions bubble up to the > > >>>> uncaught > > >>>>> exception handler (maintaining current behavior and backward > > >>>>> compatibility). > > >>>>> Limitation: Requires two configuration properties if > > >>>> users want > > >>>>> exception handling for both regular streams and GlobalKTable. > > >>>>> > > >>>>> With Option 1 - ProcessExceptionalHandler does not > > have a > > >>>> way > > >>>>> to identify which thread is invoking it as of now. We may need to > > >>>> introduce > > >>>>> ThreadType(Stream or Global) in errorHandlerContext with ThreadType > > >>>>> information. > > >>>>> With Option 2 - Client would always be aware of the > > class > > >>>> it > > >>>>> has implemented for GlobalKTables. > > >>>>> > > >>>>> Thanks and Regards > > >>>>> Arpit Goyal > > >>>>> 8861094754 > > >>>>> > > >>>>> > > >>>>> On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]> > > >>>> wrote: > > >>>>> > > >>>>>> Hi All, > > >>>>>> > > >>>>>> Thanks for the KIP! Makes sense to me and helps make KS more robust. > > >>>>>> I don't have any additional comments beyond what's been said so far. > > >>>>>> > > >>>>>> -Bill > > >>>>>> > > >>>>>> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev < > > >>>>>> [email protected]> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi, > > >>>>>>> > > >>>>>>> Overall, this makes sense to me. Thanks for the KIP! > > >>>>>>> > > >>>>>>> LB1: I was wondering precisely what we are logging in the DLQ case. > > >>>> Do > > >>>>>>> you intend to log the full record content to make he record content > > >>>>>>> recoverable, or only some metadata. I suppose it's the latter. > > >>>>>>> > > >>>>>>> LB2: Maybe I missed it (also not super fluent in that part of the > > >>>>>>> code), but will the implementers of the `ProcessExceptionalHandler` > > >>>> be > > >>>>>>> able to tell whether the error originated from a globalThread or a > > >>>>>>> StreamsThread? The implementer may want to specialize handling for > > >>>>>>> each case. This is not a must, but would be a nice to have for > > sure. > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Lucas > > >>>>>>> > > >>>>>>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal < > > >>>> [email protected]> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>> Hi All, > > >>>>>>>> Looking for more inputs and feedback. This would help to move this > > >>>> KIP > > >>>>>>>> forward. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Thanks and Regards > > >>>>>>>> Arpit Goyal > > >>>>>>>> 8861094754 > > >>>>>>>> > > >>>>>>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, < > > >>>> [email protected]> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Thanks for the response Matthias. > > >>>>>>>>> I have updated the KIP to include KIP-1034 handleError() > > automatic > > >>>>>>>>> backward compatibility. DLQ part I already mentioned under the > > >>>>>>> Limitation > > >>>>>>>>> section. Let me know if it needs to be improved further. > > >>>>>>>>> Thanks and Regards > > >>>>>>>>> Arpit Goyal > > >>>>>>>>> 8861094754 > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax < > > [email protected]> > > >>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Thanks for the clarification. Make sense to me. > > >>>>>>>>>> > > >>>>>>>>>> Might be good to add some of these details (no code reference to > > >>>>>>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP. > > Ie, > > >>>>>>> state > > >>>>>>>>>> explicitly that the new handleError() will be used and that it > > >>>>>>> provides > > >>>>>>>>>> backward compatibility automatically based on it's current > > >>>>>>> implementaion > > >>>>>>>>>> from KIP-1034. > > >>>>>>>>>> > > >>>>>>>>>> And that DLQ records, if returned, would be ignored and dropped > > >>>> and > > >>>>>> a > > >>>>>>>>>> warning is logged about it for this case. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote: > > >>>>>>>>>>> Thank you for the detailed questions! Let me clarify the > > >>>>>>> implementation > > >>>>>>>>>>> approach: > > >>>>>>>>>>> > > >>>>>>>>>>> Which Method Will Be Called? > > >>>>>>>>>>> > > >>>>>>>>>>> GlobalThread will call the NEW handleError() method (not > > the > > >>>>>>>>>> deprecated > > >>>>>>>>>>> handle()). > > >>>>>>>>>>> > > >>>>>>>>>>> Key Point: The exception handler is not called directly by > > >>>>>>>>>> GlobalThread. > > >>>>>>>>>>> Instead, it's called by ProcessorNode.process(), which already > > >>>>>>> invokes > > >>>>>>>>>>> handleError() for regular processors. > > >>>>>>>>>>> > > >>>>>>>>>>> The implementation is straightforward: > > >>>>>>>>>>> > > >>>>>>>>>>> Current code (GlobalStateUpdateTask.initTopology - line > > 161): > > >>>>>>>>>>> node.init((InternalProcessorContext) > > this.processorContext); > > >>>>>> // > > >>>>>>> No > > >>>>>>>>>>> handler passed > > >>>>>>>>>>> > > >>>>>>>>>>> Proposed change: > > >>>>>>>>>>> node.init((InternalProcessorContext) > > this.processorContext, > > >>>>>>>>>>> processingExceptionHandler); // Pass handler > > >>>>>>>>>>> > > >>>>>>>>>>> Once the handler is passed to ProcessorNode, the same code > > >>>> path > > >>>>>>> that > > >>>>>>>>>>> handles exceptions for regular KStream/KTable processors > > >>>>>>>>>>> (ProcessorNode.process() line 236) will automatically handle > > >>>>>>>>>> GlobalKTable > > >>>>>>>>>>> exceptions: > > >>>>>>>>>>> > > >>>>>>>>>>> Response response = > > >>>>>>>>>>> processingExceptionHandler.handleError(errorHandlerContext, > > >>>>>> record, > > >>>>>>>>>>> exception); > > >>>>>>>>>>> > > >>>>>>>>>>> There's no separate code path for GlobalThread - it reuses > > >>>> the > > >>>>>>>>>> existing > > >>>>>>>>>>> ProcessorNode exception handling mechanism. > > >>>>>>>>>>> > > >>>>>>>>>>> Backward Compatibility > > >>>>>>>>>>> > > >>>>>>>>>>> The handleError() method provides automatic backward > > >>>>>>> compatibility > > >>>>>>>>>> via > > >>>>>>>>>>> its default implementation: > > >>>>>>>>>>> > > >>>>>>>>>>> default Response handleError(...) { > > >>>>>>>>>>> return new Response(Result.from(handle(...)), > > >>>>>>>>>>> Collections.emptyList()); > > >>>>>>>>>>> } > > >>>>>>>>>>> > > >>>>>>>>>>> - If users implement the old handle() method: > > handleError() > > >>>>>>>>>> delegates to > > >>>>>>>>>>> it automatically > > >>>>>>>>>>> - If users implement the new handleError() method: it's > > used > > >>>>>>> directly > > >>>>>>>>>>> - No code changes required for existing applications > > >>>>>>>>>>> > > >>>>>>>>>>> Dead Letter Queue (DLQ) Support > > >>>>>>>>>>> > > >>>>>>>>>>> This is where GlobalKTable differs from regular > > processors: > > >>>>>>>>>>> > > >>>>>>>>>>> The Limitation: GlobalThread does not have a Producer, so > > it > > >>>>>>> cannot > > >>>>>>>>>> send > > >>>>>>>>>>> DLQ records to Kafka. > > >>>>>>>>>>> > > >>>>>>>>>>> Proposed Approach: > > >>>>>>>>>>> > > >>>>>>>>>>> 1. For KIP-1270: When ProcessorNode detects DLQ records > > but > > >>>> the > > >>>>>>>>>> context > > >>>>>>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will > > log > > >>>>>> a > > >>>>>>>>>> warning > > >>>>>>>>>>> instead of sending: > > >>>>>>>>>>> > > >>>>>>>>>>> log.warn("Dead letter queue records cannot be sent for > > >>>>>>> GlobalKTable > > >>>>>>>>>>> processors " + > > >>>>>>>>>>> "(no producer available). DLQ support for > > >>>> GlobalKTable > > >>>>>>> will > > >>>>>>>>>> be > > >>>>>>>>>>> addressed in a future KIP. " + > > >>>>>>>>>>> "Record details logged: topic={}, headers={}", > > ...); > > >>>>>>>>>>> > > >>>>>>>>>>> 2. Future KIP: Full DLQ support for GlobalKTable (requires > > >>>>>> adding > > >>>>>>>>>>> Producer infrastructure) will be proposed separately, as it's a > > >>>>>>> larger > > >>>>>>>>>>> architectural change. > > >>>>>>>>>>> > > >>>>>>>>>>> How This Avoids User Confusion > > >>>>>>>>>>> > > >>>>>>>>>>> 1. Single handler for all processors: Users configure ONE > > >>>>>>>>>>> ProcessingExceptionHandler that works for both regular and > > global > > >>>>>>>>>> processors > > >>>>>>>>>>> 2. Consistent behavior: Result.RESUME continues, > > Result.FAIL > > >>>>>>> stops - > > >>>>>>>>>> same > > >>>>>>>>>>> for both > > >>>>>>>>>>> 3. Clear limitation: DLQ records are logged (not sent) for > > >>>>>>>>>> GlobalKTable, > > >>>>>>>>>>> with explicit warning message > > >>>>>>>>>>> 4. Documentation: Config docs will clearly state DLQ > > sending > > >>>>>>>>>> limitation > > >>>>>>>>>>> for GlobalKTable > > >>>>>>>>>>> Thanks and Regards > > >>>>>>>>>>> Arpit Goyal > > >>>>>>>>>>> 8861094754 > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax < > > >>>> [email protected] > > >>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Thanks for the KIP Arpit. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Can you elaborate a little bit more on details? With the newly > > >>>>>>> added > > >>>>>>>>>> DLQ > > >>>>>>>>>>>> support for regular `Processor`, the existing > > >>>>>>>>>>>> `ProcessingHandlerResponse` and corresponding method > > `handle()` > > >>>>>> are > > >>>>>>>>>>>> deprecated with upcoming 4.2 release. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not > > >>>>>>> implement > > >>>>>>>>>>>> the old `handle()` (even if it's still supported, as long as > > the > > >>>>>>> new > > >>>>>>>>>>>> `handleError` is not overwritten). > > >>>>>>>>>>>> > > >>>>>>>>>>>> Are you proposing, for now, to only add support for the > > >>>>>> deprecated > > >>>>>>>>>>>> `handle()` method, ie, the new `handleError()` method would > > not > > >>>>>> be > > >>>>>>>>>>>> called by the global-thread code? If yes, this might be > > >>>> confusing > > >>>>>>> for > > >>>>>>>>>>>> users? > > >>>>>>>>>>>> > > >>>>>>>>>>>> If you do not propose this, would it imply that the > > >>>> global-thread > > >>>>>>> code > > >>>>>>>>>>>> would call the new `handlerError()` method? For this case, the > > >>>>>>> question > > >>>>>>>>>>>> is what the runtime would do if users try to use the DLQ > > >>>> feature? > > >>>>>>>>>>>> > > >>>>>>>>>>>> Overall, it's unclear to me what you propose in detail and how > > >>>> we > > >>>>>>> can > > >>>>>>>>>>>> avoid to confuse users, keep it backward compatible, and make > > it > > >>>>>>> easy > > >>>>>>>>>> to > > >>>>>>>>>>>> understanding how the handler will work. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Matthias > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote: > > >>>>>>>>>>>>> Hi Team > > >>>>>>>>>>>>> Just reaching out again.Need your inputs to move it forward > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Thanks and Regards > > >>>>>>>>>>>>> Arpit Goyal > > >>>>>>>>>>>>> 8861094754 > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, < > > >>>>>>> [email protected]> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi All, > > >>>>>>>>>>>>>> I would like to start a discussion for KIP-1270. This KIP > > >>>>>>> extends > > >>>>>>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable > > processors, > > >>>>>>>>>> enabling > > >>>>>>>>>>>>>> consistent exception handling across all stream processing > > >>>>>> types. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> * Current Behavior* > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> When a processing exception occurs in a GlobalKTable > > >>>>>>> processor: > > >>>>>>>>>>>>>> - The exception propagates to GlobalStreamThread > > >>>>>>>>>>>>>> - The GlobalStreamThread terminates > > >>>>>>>>>>>>>> - The entire Kafka Streams application shuts down > > >>>>>>>>>>>>>> - No user-configurable exception handling is available > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> * Proposed Behavior* > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> After this KIP, when a processing exception occurs in > > a > > >>>>>>>>>> GlobalKTable > > >>>>>>>>>>>>>> processor: > > >>>>>>>>>>>>>> - The configured > > ProcessingExceptionHandler.handleError() > > >>>>>>> will be > > >>>>>>>>>>>> invoked > > >>>>>>>>>>>>>> - If the handler returns Result.RESUME, processing > > >>>>>> continues > > >>>>>>>>>> with the > > >>>>>>>>>>>>>> next record > > >>>>>>>>>>>>>> - If the handler returns Result.FAIL, the exception > > >>>>>>> propagates > > >>>>>>>>>> (same > > >>>>>>>>>>>> as > > >>>>>>>>>>>>>> current behavior) > > >>>>>>>>>>>>>> - If no handler is configured, behavior remains > > unchanged > > >>>>>>>>>> (backward > > >>>>>>>>>>>>>> compatible) > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> KIP: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks and Regards > > >>>>>>>>>>>>>> Arpit Goyal > > >>>>>>>>>>>>>> 8861094754 > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > > > > > >
