Thanks Arpit. LGTM.
-Matthias
On 2/23/26 2:13 AM, Lucas Brutschy via dev wrote:
Thanks for the updates, Arpit
On Mon, Feb 23, 2026 at 1:31 AM Arpit Goyal <[email protected]> wrote:
Thanks, Matthias. Yes, HIGH would make sense as we will be deprecating
this config in future, we want people to handle the error for Global
Processor failure.
I have updated the KIP with these details.
Thanks and Regards
Arpit Goyal
8861094754
On Mon, Feb 23, 2026 at 2:07 AM Matthias J. Sax <[email protected]> wrote:
Thanks for updating the KIP Arpit.
Two small things. Can we add a `@Deprecated` tag to the table in the
wiki page listing the new config? The description already explains it,
but it's easy to miss; adding the tag makes it easier to the reader.
The other thing we should add is the "importance level" of the config.
For this case, as we want people to enable it, I believe HIGH would be
the right level?
-Matthias
On 2/20/26 2:21 AM, Arpit Goyal wrote:
Thanks, Lucas, for the feedback.
I have addressed all the points mentioned above and updated the KIP.
Thanks and Regards
Arpit Goyal
8861094754
On Fri, Feb 20, 2026 at 3:12 PM Lucas Brutschy via dev <
[email protected]>
wrote:
Hi Arpit!
This mostly looks good to me.
lb3: How about processing.exception.handler.global.enabled instead of
default.processing.exception.handler.invoke.for.global.
lb4: Matthias explicitly asked for the KIP to state that handleError()
(not the deprecated handle()) will be used. I did not see that in the
text.
lb5: For the DLQ case, KIP still doesn't specify what is logged — full
record content (key/value) or just metadata (topic, partition, offset,
headers).
lb6: nit: Title seems a bit misleading actually. it's not introducing
a ProcessExceptionalHandler for GlobalThread, more like ""Extend
ProcessingExceptionHandler to GlobalKTable processors"
I think we can accept it for 4.3, but maybe take a look at these
suggestions before that.
Cheers,
Lucas
On Mon, Feb 16, 2026 at 5:59 PM Arpit Goyal <[email protected]>
wrote:
Thanks, Matthias and Lucas, for the input. I have updated the KIP with
the
deprecated config detail.
Thanks and Regards
Arpit Goyal
8861094754
On Thu, Feb 12, 2026 at 11:50 AM Arpit Goyal <[email protected]
wrote:
Yes I do agree on having a deprecated config. If everyone is aligned i
would update the KIP.
Thanks and Regards
Arpit Goyal
8861094754
On Thu, 12 Feb, 2026, 10:04 am Matthias J. Sax, <[email protected]>
wrote:
Yes, that's the idea.
-Matthias
On 2/11/26 4:44 PM, Arpit Goyal wrote:
Thanks, Lucas. Just to confirm, we are suggesting a kind of feature
flag
config for this KIP, which if enabled then only global processor
would
use the existing handler otherwise restore to default
implementation.
Thanks and Regards
Arpit Goyal
8861094754
On Thu, Feb 12, 2026 at 1:02 AM Lucas Brutschy via dev <
[email protected]>
wrote:
Hi,
I agree that this change could cause the processing error exception
handler to crash under very specific circumstances. I think it's a
fairly rare case, but since we are raising this as a feature, not
as a
bug fix, you are right that we should follow the proposed route of
new
deprecated configuration to be on the safe side.
Cheers,
Lucas
On Wed, Feb 11, 2026 at 2:48 PM Arpit Goyal <
[email protected]>
wrote:
Thanks, Matthias, for the valuable input.
Yes, it may crash, depending on the existing handler
implementation,
or
even lead to false information if alerting/logging is implemented
in
the
handler.
But what I feel is that even if the handler crashes in the new
implementation, we are no worse off than the current state, where
GlobalKTable exceptions already crash or shut down the
application.
Thanks and Regards
Arpit Goyal
8861094754
On Wed, Feb 11, 2026 at 1:39 PM Matthias J. Sax <[email protected]
wrote:
Arpit,
I was just re-reading the KIP, and I am wondering if the proposed
change
is really fully backward compatible? The KIP says:
This change is fully backward compatible:
- Applications without a configured handler experience no
behavior
change
- Applications with a configured handler automatically get
GlobalKTable support
The second point seems to be a change in behavior though? In the
very
extreme case, the handler code might not be prepared for this
case,
and
could crash, or cause undesired side-effects?
So I am wondering, if we would actually need a config which by
default
would keep this new feature disabled, and user would actively
need to
change the config to enable it?
If we go this route, we could also immediately _deprecate_ this
new
config (yes, it's a little odd to add a new config an deprecate
it
right
away, but we have done this in the past), informing users that
with
Kafka Streams 5.0, the handler will be invoked for "global
Processor"
errors by default (and it's not possible to turn it off any
longer).
Maybe I am too worried here, but based on experience, users do
many
unexpected things, and to guard ourselves against surprises, it
might
be
the safer option, to play it conservative?
Thoughts? -- Before you make any changes to the KIP, let's hear
from
others. I guess I can also be convinces that the propose change
is
safe
enough as-is, and introducing a config would be overkill...
-Matthias
On 2/2/26 9:04 AM, Arpit Goyal wrote:
Hi Everyone
I initiated the voting thread for this KIP.
Thanks and Regards
Arpit Goyal
8861094754
On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <
[email protected]>
wrote:
Thanks everyone for the input. Should I start voting on it ?
Thanks and Regards
Arpit Goyal
8861094754
On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <
[email protected]>
wrote:
Thanks Matthias.
That makes sense. Client can use the single handler
implementation to
support error handling for both Stream Thread and Global
thread.
There
is
no need to introduce ThreadType parameter or another
configuration
for
the
same.
@Lucas Brutschy <[email protected]> It must answered
your
query
?
Thanks and Regards
Arpit Goyal
8861094754
On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <
[email protected]
wrote:
I don't think we would need multiple handlers. The handler is
invoked
passing in `ErrorHandlerContext` parameter, which provides
enough
information to distinguish the case (eg, topic(),
processorNodeId(),
and
taskId()), so users can implement different logic inside the
same
handler for the different cases if necessary.
-Matthias
On 1/23/26 10:05 AM, Arpit Goyal wrote:
Thanks Bill and Lucas for the feedback.
LB1: I was wondering precisely what we are logging in the
DLQ
case.
Do
you intend to log the full record content to
make he
record
content
recoverable, or only some metadata. I suppose
it's
the
latter.
> Since GlobalKTable lacks producer infrastructure,
DLQ
records
will be
logged with full metadata but NOT sent to a Kafka topic
LB2: Maybe I missed it (also not super fluent in that
part of
the
code), but will the implementers of the
`ProcessExceptionalHandler` be
able to tell whether the error originated from
a
globalThread or a
StreamsThread? The implementer may want to
specialize
handling for
each case. This is not a must, but would be a
nice
to
have
for
sure.
>. Great question! We have two options here
Option 1: Single Handler Configuration
Users define one implementation of
ProcessingExceptionHandler that handles errors for all
stream
types
(KStream, KTable, and GlobalKTable). This maintains
consistency with the existing
DeserializationExceptionHandler pattern.
Limitation: This will enforce the same handling
behavior
for
global exception handling that we defined for KStream
processing
exception
handling. This keeps things
simple but is not flexible
enough
for
users who
may want different behavior for GlobalKTable.
Option 2: Separate Optional Configuration
Introduce a new optional
configuration:
global.processing.exception.handler. If configured, it
applies
specifically
to GlobalKTable processing errors; if not
configured, exceptions bubble
up
to
the
uncaught
exception handler (maintaining current behavior and backward
compatibility).
Limitation: Requires two configuration
properties
if
users want
exception handling for both regular streams and
GlobalKTable.
With Option 1 - ProcessExceptionalHandler
does
not
have a
way
to identify which thread is invoking it as of now. We may
need
to
introduce
ThreadType(Stream or Global) in errorHandlerContext with
ThreadType
information.
With Option 2 - Client would always be aware
of
the
class
it
has implemented for GlobalKTables.
Thanks and Regards
Arpit Goyal
8861094754
On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <
[email protected]
wrote:
Hi All,
Thanks for the KIP! Makes sense to me and helps make KS
more
robust.
I don't have any additional comments beyond what's been
said so
far.
-Bill
On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
[email protected]>
wrote:
Hi,
Overall, this makes sense to me. Thanks for the KIP!
LB1: I was wondering precisely what we are logging in the
DLQ
case.
Do
you intend to log the full record content to make he
record
content
recoverable, or only some metadata. I suppose it's the
latter.
LB2: Maybe I missed it (also not super fluent in that
part of
the
code), but will the implementers of the
`ProcessExceptionalHandler`
be
able to tell whether the error originated from a
globalThread
or a
StreamsThread? The implementer may want to specialize
handling
for
each case. This is not a must, but would be a nice to
have for
sure.
Cheers,
Lucas
On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <
[email protected]>
wrote:
Hi All,
Looking for more inputs and feedback. This would help to
move
this
KIP
forward.
Thanks and Regards
Arpit Goyal
8861094754
On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <
[email protected]>
wrote:
Thanks for the response Matthias.
I have updated the KIP to include KIP-1034 handleError()
automatic
backward compatibility. DLQ part I already mentioned
under
the
Limitation
section. Let me know if it needs to be improved further.
Thanks and Regards
Arpit Goyal
8861094754
On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <
[email protected]>
wrote:
Thanks for the clarification. Make sense to me.
Might be good to add some of these details (no code
reference to
`ProcessorNode` etc necessary as it's impl detail) to
the
KIP.
Ie,
state
explicitly that the new handleError() will be used and
that
it
provides
backward compatibility automatically based on it's
current
implementaion
from KIP-1034.
And that DLQ records, if returned, would be ignored and
dropped
and
a
warning is logged about it for this case.
-Matthias
On 1/12/26 2:29 AM, Arpit Goyal wrote:
Thank you for the detailed questions! Let me clarify
the
implementation
approach:
Which Method Will Be Called?
GlobalThread will call the NEW handleError()
method
(not
the
deprecated
handle()).
Key Point: The exception handler is not called
directly by
GlobalThread.
Instead, it's called by ProcessorNode.process(), which
already
invokes
handleError() for regular processors.
The implementation is straightforward:
Current code
(GlobalStateUpdateTask.initTopology -
line
161):
node.init((InternalProcessorContext)
this.processorContext);
//
No
handler passed
Proposed change:
node.init((InternalProcessorContext)
this.processorContext,
processingExceptionHandler); // Pass handler
Once the handler is passed to ProcessorNode,
the
same
code
path
that
handles exceptions for regular KStream/KTable
processors
(ProcessorNode.process() line 236) will automatically
handle
GlobalKTable
exceptions:
Response response =
processingExceptionHandler.handleError(errorHandlerContext,
record,
exception);
There's no separate code path for GlobalThread
- it
reuses
the
existing
ProcessorNode exception handling mechanism.
Backward Compatibility
The handleError() method provides automatic
backward
compatibility
via
its default implementation:
default Response handleError(...) {
return new
Response(Result.from(handle(...)),
Collections.emptyList());
}
- If users implement the old handle() method:
handleError()
delegates to
it automatically
- If users implement the new handleError()
method:
it's
used
directly
- No code changes required for existing
applications
Dead Letter Queue (DLQ) Support
This is where GlobalKTable differs from regular
processors:
The Limitation: GlobalThread does not have a
Producer, so
it
cannot
send
DLQ records to Kafka.
Proposed Approach:
1. For KIP-1270: When ProcessorNode detects DLQ
records
but
the
context
doesn't support RecordCollector (i.e., GlobalThread),
it
will
log
a
warning
instead of sending:
log.warn("Dead letter queue records cannot be
sent
for
GlobalKTable
processors " +
"(no producer available). DLQ support
for
GlobalKTable
will
be
addressed in a future KIP. " +
"Record details logged: topic={},
headers={}",
...);
2. Future KIP: Full DLQ support for
GlobalKTable
(requires
adding
Producer infrastructure) will be proposed separately,
as
it's a
larger
architectural change.
How This Avoids User Confusion
1. Single handler for all processors: Users
configure
ONE
ProcessingExceptionHandler that works for both
regular and
global
processors
2. Consistent behavior: Result.RESUME
continues,
Result.FAIL
stops -
same
for both
3. Clear limitation: DLQ records are logged
(not
sent) for
GlobalKTable,
with explicit warning message
4. Documentation: Config docs will clearly
state DLQ
sending
limitation
for GlobalKTable
Thanks and Regards
Arpit Goyal
8861094754
On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <
[email protected]
wrote:
Thanks for the KIP Arpit.
Can you elaborate a little bit more on details? With
the
newly
added
DLQ
support for regular `Processor`, the existing
`ProcessingHandlerResponse` and corresponding method
`handle()`
are
deprecated with upcoming 4.2 release.
Thus, from AK 4.2+ going forward, users are expected
to
not
implement
the old `handle()` (even if it's still supported, as
long
as
the
new
`handleError` is not overwritten).
Are you proposing, for now, to only add support for
the
deprecated
`handle()` method, ie, the new `handleError()` method
would
not
be
called by the global-thread code? If yes, this might
be
confusing
for
users?
If you do not propose this, would it imply that the
global-thread
code
would call the new `handlerError()` method? For this
case, the
question
is what the runtime would do if users try to use the
DLQ
feature?
Overall, it's unclear to me what you propose in
detail
and how
we
can
avoid to confuse users, keep it backward compatible,
and
make
it
easy
to
understanding how the handler will work.
-Matthias
On 1/10/26 8:33 AM, Arpit Goyal wrote:
Hi Team
Just reaching out again.Need your inputs to move it
forward
Thanks and Regards
Arpit Goyal
8861094754
On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
[email protected]>
wrote:
Hi All,
I would like to start a discussion for KIP-1270.
This
KIP
extends
ProcessingExceptionHandler support to GlobalKTable
processors,
enabling
consistent exception handling across all stream
processing
types.
* Current Behavior*
When a processing exception occurs in a
GlobalKTable
processor:
- The exception propagates to
GlobalStreamThread
- The GlobalStreamThread terminates
- The entire Kafka Streams application
shuts
down
- No user-configurable exception handling
is
available
* Proposed Behavior*
After this KIP, when a processing exception
occurs in
a
GlobalKTable
processor:
- The configured
ProcessingExceptionHandler.handleError()
will be
invoked
- If the handler returns Result.RESUME,
processing
continues
with the
next record
- If the handler returns Result.FAIL, the
exception
propagates
(same
as
current behavior)
- If no handler is configured, behavior
remains
unchanged
(backward
compatible)
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread
Thanks and Regards
Arpit Goyal
8861094754