Hi Everyone I initiated the voting thread for this KIP. Thanks and Regards Arpit Goyal 8861094754
On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <[email protected]> wrote: > Thanks everyone for the input. Should I start voting on it ? > > Thanks and Regards > Arpit Goyal > 8861094754 > > On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <[email protected]> > wrote: > >> Thanks Matthias. >> That makes sense. Client can use the single handler implementation to >> support error handling for both Stream Thread and Global thread. There is >> no need to introduce ThreadType parameter or another configuration for the >> same. >> @Lucas Brutschy <[email protected]> It must answered your query ? >> >> Thanks and Regards >> Arpit Goyal >> 8861094754 >> >> >> On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]> >> wrote: >> >>> I don't think we would need multiple handlers. The handler is invoked >>> passing in `ErrorHandlerContext` parameter, which provides enough >>> information to distinguish the case (eg, topic(), processorNodeId(), and >>> taskId()), so users can implement different logic inside the same >>> handler for the different cases if necessary. >>> >>> >>> -Matthias >>> >>> >>> On 1/23/26 10:05 AM, Arpit Goyal wrote: >>> > Thanks Bill and Lucas for the feedback. >>> > >>> > LB1: I was wondering precisely what we are logging in the DLQ case. Do >>> > you intend to log the full record content to make he record >>> content >>> > recoverable, or only some metadata. I suppose it's the >>> latter. >>> > >>> > >>> > > Since GlobalKTable lacks producer infrastructure, DLQ records >>> will be >>> > logged with full metadata but NOT sent to a Kafka topic >>> > LB2: Maybe I missed it (also not super fluent in that part of the >>> > code), but will the implementers of the >>> > `ProcessExceptionalHandler` be >>> > able to tell whether the error originated from a >>> globalThread or a >>> > StreamsThread? The implementer may want to specialize >>> handling for >>> > each case. This is not a must, but would be a nice to have >>> for >>> > sure. >>> > >>> > >. Great question! We have two options here >>> > Option 1: Single Handler Configuration >>> > >>> > >>> > Users define one implementation of >>> > ProcessingExceptionHandler that handles errors for all stream types >>> > (KStream, KTable, and GlobalKTable). This maintains >>> > consistency with the existing >>> > DeserializationExceptionHandler pattern. >>> > Limitation: This will enforce the same handling behavior for >>> > global exception handling that we defined for KStream processing >>> exception >>> > handling. This keeps things >>> > simple but is not flexible enough for >>> users who >>> > may want different behavior for GlobalKTable. >>> > Option 2: Separate Optional Configuration >>> > >>> > >>> > Introduce a new optional configuration: >>> > global.processing.exception.handler. If configured, it applies >>> specifically >>> > to GlobalKTable processing errors; if not >>> > configured, exceptions bubble up to the >>> uncaught >>> > exception handler (maintaining current behavior and backward >>> > compatibility). >>> > Limitation: Requires two configuration properties if >>> users want >>> > exception handling for both regular streams and GlobalKTable. >>> > >>> > With Option 1 - ProcessExceptionalHandler does not have a >>> way >>> > to identify which thread is invoking it as of now. We may need to >>> introduce >>> > ThreadType(Stream or Global) in errorHandlerContext with ThreadType >>> > information. >>> > With Option 2 - Client would always be aware of the class >>> it >>> > has implemented for GlobalKTables. >>> > >>> > Thanks and Regards >>> > Arpit Goyal >>> > 8861094754 >>> > >>> > >>> > On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]> >>> wrote: >>> > >>> >> Hi All, >>> >> >>> >> Thanks for the KIP! Makes sense to me and helps make KS more robust. >>> >> I don't have any additional comments beyond what's been said so far. >>> >> >>> >> -Bill >>> >> >>> >> On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev < >>> >> [email protected]> >>> >> wrote: >>> >> >>> >>> Hi, >>> >>> >>> >>> Overall, this makes sense to me. Thanks for the KIP! >>> >>> >>> >>> LB1: I was wondering precisely what we are logging in the DLQ case. >>> Do >>> >>> you intend to log the full record content to make he record content >>> >>> recoverable, or only some metadata. I suppose it's the latter. >>> >>> >>> >>> LB2: Maybe I missed it (also not super fluent in that part of the >>> >>> code), but will the implementers of the `ProcessExceptionalHandler` >>> be >>> >>> able to tell whether the error originated from a globalThread or a >>> >>> StreamsThread? The implementer may want to specialize handling for >>> >>> each case. This is not a must, but would be a nice to have for sure. >>> >>> >>> >>> Cheers, >>> >>> Lucas >>> >>> >>> >>> On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal < >>> [email protected]> >>> >>> wrote: >>> >>>> >>> >>>> Hi All, >>> >>>> Looking for more inputs and feedback. This would help to move this >>> KIP >>> >>>> forward. >>> >>>> >>> >>>> >>> >>>> Thanks and Regards >>> >>>> Arpit Goyal >>> >>>> 8861094754 >>> >>>> >>> >>>> On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, < >>> [email protected]> >>> >>> wrote: >>> >>>> >>> >>>>> Thanks for the response Matthias. >>> >>>>> I have updated the KIP to include KIP-1034 handleError() automatic >>> >>>>> backward compatibility. DLQ part I already mentioned under the >>> >>> Limitation >>> >>>>> section. Let me know if it needs to be improved further. >>> >>>>> Thanks and Regards >>> >>>>> Arpit Goyal >>> >>>>> 8861094754 >>> >>>>> >>> >>>>> >>> >>>>> On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <[email protected]> >>> >>> wrote: >>> >>>>> >>> >>>>>> Thanks for the clarification. Make sense to me. >>> >>>>>> >>> >>>>>> Might be good to add some of these details (no code reference to >>> >>>>>> `ProcessorNode` etc necessary as it's impl detail) to the KIP. Ie, >>> >>> state >>> >>>>>> explicitly that the new handleError() will be used and that it >>> >>> provides >>> >>>>>> backward compatibility automatically based on it's current >>> >>> implementaion >>> >>>>>> from KIP-1034. >>> >>>>>> >>> >>>>>> And that DLQ records, if returned, would be ignored and dropped >>> and >>> >> a >>> >>>>>> warning is logged about it for this case. >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> -Matthias >>> >>>>>> >>> >>>>>> >>> >>>>>> On 1/12/26 2:29 AM, Arpit Goyal wrote: >>> >>>>>>> Thank you for the detailed questions! Let me clarify the >>> >>> implementation >>> >>>>>>> approach: >>> >>>>>>> >>> >>>>>>> Which Method Will Be Called? >>> >>>>>>> >>> >>>>>>> GlobalThread will call the NEW handleError() method (not the >>> >>>>>> deprecated >>> >>>>>>> handle()). >>> >>>>>>> >>> >>>>>>> Key Point: The exception handler is not called directly by >>> >>>>>> GlobalThread. >>> >>>>>>> Instead, it's called by ProcessorNode.process(), which already >>> >>> invokes >>> >>>>>>> handleError() for regular processors. >>> >>>>>>> >>> >>>>>>> The implementation is straightforward: >>> >>>>>>> >>> >>>>>>> Current code (GlobalStateUpdateTask.initTopology - line 161): >>> >>>>>>> node.init((InternalProcessorContext) this.processorContext); >>> >> // >>> >>> No >>> >>>>>>> handler passed >>> >>>>>>> >>> >>>>>>> Proposed change: >>> >>>>>>> node.init((InternalProcessorContext) this.processorContext, >>> >>>>>>> processingExceptionHandler); // Pass handler >>> >>>>>>> >>> >>>>>>> Once the handler is passed to ProcessorNode, the same code >>> path >>> >>> that >>> >>>>>>> handles exceptions for regular KStream/KTable processors >>> >>>>>>> (ProcessorNode.process() line 236) will automatically handle >>> >>>>>> GlobalKTable >>> >>>>>>> exceptions: >>> >>>>>>> >>> >>>>>>> Response response = >>> >>>>>>> processingExceptionHandler.handleError(errorHandlerContext, >>> >> record, >>> >>>>>>> exception); >>> >>>>>>> >>> >>>>>>> There's no separate code path for GlobalThread - it reuses >>> the >>> >>>>>> existing >>> >>>>>>> ProcessorNode exception handling mechanism. >>> >>>>>>> >>> >>>>>>> Backward Compatibility >>> >>>>>>> >>> >>>>>>> The handleError() method provides automatic backward >>> >>> compatibility >>> >>>>>> via >>> >>>>>>> its default implementation: >>> >>>>>>> >>> >>>>>>> default Response handleError(...) { >>> >>>>>>> return new Response(Result.from(handle(...)), >>> >>>>>>> Collections.emptyList()); >>> >>>>>>> } >>> >>>>>>> >>> >>>>>>> - If users implement the old handle() method: handleError() >>> >>>>>> delegates to >>> >>>>>>> it automatically >>> >>>>>>> - If users implement the new handleError() method: it's used >>> >>> directly >>> >>>>>>> - No code changes required for existing applications >>> >>>>>>> >>> >>>>>>> Dead Letter Queue (DLQ) Support >>> >>>>>>> >>> >>>>>>> This is where GlobalKTable differs from regular processors: >>> >>>>>>> >>> >>>>>>> The Limitation: GlobalThread does not have a Producer, so it >>> >>> cannot >>> >>>>>> send >>> >>>>>>> DLQ records to Kafka. >>> >>>>>>> >>> >>>>>>> Proposed Approach: >>> >>>>>>> >>> >>>>>>> 1. For KIP-1270: When ProcessorNode detects DLQ records but >>> the >>> >>>>>> context >>> >>>>>>> doesn't support RecordCollector (i.e., GlobalThread), it will log >>> >> a >>> >>>>>> warning >>> >>>>>>> instead of sending: >>> >>>>>>> >>> >>>>>>> log.warn("Dead letter queue records cannot be sent for >>> >>> GlobalKTable >>> >>>>>>> processors " + >>> >>>>>>> "(no producer available). DLQ support for >>> GlobalKTable >>> >>> will >>> >>>>>> be >>> >>>>>>> addressed in a future KIP. " + >>> >>>>>>> "Record details logged: topic={}, headers={}", ...); >>> >>>>>>> >>> >>>>>>> 2. Future KIP: Full DLQ support for GlobalKTable (requires >>> >> adding >>> >>>>>>> Producer infrastructure) will be proposed separately, as it's a >>> >>> larger >>> >>>>>>> architectural change. >>> >>>>>>> >>> >>>>>>> How This Avoids User Confusion >>> >>>>>>> >>> >>>>>>> 1. Single handler for all processors: Users configure ONE >>> >>>>>>> ProcessingExceptionHandler that works for both regular and global >>> >>>>>> processors >>> >>>>>>> 2. Consistent behavior: Result.RESUME continues, Result.FAIL >>> >>> stops - >>> >>>>>> same >>> >>>>>>> for both >>> >>>>>>> 3. Clear limitation: DLQ records are logged (not sent) for >>> >>>>>> GlobalKTable, >>> >>>>>>> with explicit warning message >>> >>>>>>> 4. Documentation: Config docs will clearly state DLQ sending >>> >>>>>> limitation >>> >>>>>>> for GlobalKTable >>> >>>>>>> Thanks and Regards >>> >>>>>>> Arpit Goyal >>> >>>>>>> 8861094754 >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax < >>> [email protected] >>> >>> >>> >>>>>> wrote: >>> >>>>>>> >>> >>>>>>>> Thanks for the KIP Arpit. >>> >>>>>>>> >>> >>>>>>>> Can you elaborate a little bit more on details? With the newly >>> >>> added >>> >>>>>> DLQ >>> >>>>>>>> support for regular `Processor`, the existing >>> >>>>>>>> `ProcessingHandlerResponse` and corresponding method `handle()` >>> >> are >>> >>>>>>>> deprecated with upcoming 4.2 release. >>> >>>>>>>> >>> >>>>>>>> Thus, from AK 4.2+ going forward, users are expected to not >>> >>> implement >>> >>>>>>>> the old `handle()` (even if it's still supported, as long as the >>> >>> new >>> >>>>>>>> `handleError` is not overwritten). >>> >>>>>>>> >>> >>>>>>>> Are you proposing, for now, to only add support for the >>> >> deprecated >>> >>>>>>>> `handle()` method, ie, the new `handleError()` method would not >>> >> be >>> >>>>>>>> called by the global-thread code? If yes, this might be >>> confusing >>> >>> for >>> >>>>>>>> users? >>> >>>>>>>> >>> >>>>>>>> If you do not propose this, would it imply that the >>> global-thread >>> >>> code >>> >>>>>>>> would call the new `handlerError()` method? For this case, the >>> >>> question >>> >>>>>>>> is what the runtime would do if users try to use the DLQ >>> feature? >>> >>>>>>>> >>> >>>>>>>> Overall, it's unclear to me what you propose in detail and how >>> we >>> >>> can >>> >>>>>>>> avoid to confuse users, keep it backward compatible, and make it >>> >>> easy >>> >>>>>> to >>> >>>>>>>> understanding how the handler will work. >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> -Matthias >>> >>>>>>>> >>> >>>>>>>> On 1/10/26 8:33 AM, Arpit Goyal wrote: >>> >>>>>>>>> Hi Team >>> >>>>>>>>> Just reaching out again.Need your inputs to move it forward >>> >>>>>>>>> >>> >>>>>>>>> Thanks and Regards >>> >>>>>>>>> Arpit Goyal >>> >>>>>>>>> 8861094754 >>> >>>>>>>>> >>> >>>>>>>>> On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, < >>> >>> [email protected]> >>> >>>>>>>> wrote: >>> >>>>>>>>> >>> >>>>>>>>>> Hi All, >>> >>>>>>>>>> I would like to start a discussion for KIP-1270. This KIP >>> >>> extends >>> >>>>>>>>>> ProcessingExceptionHandler support to GlobalKTable processors, >>> >>>>>> enabling >>> >>>>>>>>>> consistent exception handling across all stream processing >>> >> types. >>> >>>>>>>>>> >>> >>>>>>>>>> * Current Behavior* >>> >>>>>>>>>> >>> >>>>>>>>>> When a processing exception occurs in a GlobalKTable >>> >>> processor: >>> >>>>>>>>>> - The exception propagates to GlobalStreamThread >>> >>>>>>>>>> - The GlobalStreamThread terminates >>> >>>>>>>>>> - The entire Kafka Streams application shuts down >>> >>>>>>>>>> - No user-configurable exception handling is available >>> >>>>>>>>>> >>> >>>>>>>>>> * Proposed Behavior* >>> >>>>>>>>>> >>> >>>>>>>>>> After this KIP, when a processing exception occurs in a >>> >>>>>> GlobalKTable >>> >>>>>>>>>> processor: >>> >>>>>>>>>> - The configured ProcessingExceptionHandler.handleError() >>> >>> will be >>> >>>>>>>> invoked >>> >>>>>>>>>> - If the handler returns Result.RESUME, processing >>> >> continues >>> >>>>>> with the >>> >>>>>>>>>> next record >>> >>>>>>>>>> - If the handler returns Result.FAIL, the exception >>> >>> propagates >>> >>>>>> (same >>> >>>>>>>> as >>> >>>>>>>>>> current behavior) >>> >>>>>>>>>> - If no handler is configured, behavior remains unchanged >>> >>>>>> (backward >>> >>>>>>>>>> compatible) >>> >>>>>>>>>> >>> >>>>>>>>>> KIP: >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>> >>> >>> >>> >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread >>> >>>>>>>>>> >>> >>>>>>>>>> Thanks and Regards >>> >>>>>>>>>> Arpit Goyal >>> >>>>>>>>>> 8861094754 >>> >>>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>>> >>> >>>>>> >>> >>> >>> >> >>> > >>> >>>
