Yes, that's the idea.

-Matthias

On 2/11/26 4:44 PM, Arpit Goyal wrote:
Thanks, Lucas. Just to confirm, we are suggesting a kind of feature flag
config for this KIP,   which if enabled then only global processor would
use the existing handler otherwise  restore to default implementation.
Thanks and Regards
Arpit Goyal
8861094754


On Thu, Feb 12, 2026 at 1:02 AM Lucas Brutschy via dev <[email protected]>
wrote:

Hi,

I agree that this change could cause the processing error exception
handler to crash under very specific circumstances. I think it's a
fairly rare case, but since we are raising this as a feature, not as a
bug fix, you are right that we should follow the proposed route of new
deprecated configuration to be on the safe side.

Cheers,
Lucas


On Wed, Feb 11, 2026 at 2:48 PM Arpit Goyal <[email protected]>
wrote:

Thanks, Matthias, for the valuable input.
Yes, it may crash, depending on the existing handler implementation, or
even lead to false information if alerting/logging is implemented in the
handler.
But what I feel is that even if the handler crashes in the new
implementation, we are no worse off than the current state, where
GlobalKTable exceptions already crash or shut down the application.

Thanks and Regards
Arpit Goyal
8861094754


On Wed, Feb 11, 2026 at 1:39 PM Matthias J. Sax <[email protected]>
wrote:

Arpit,

I was just re-reading the KIP, and I am wondering if the proposed
change
is really fully backward compatible? The KIP says:

This change is fully backward compatible:
   - Applications without a configured handler experience no behavior
change
   - Applications with a configured handler automatically get
GlobalKTable support

The second point seems to be a change in behavior though? In the very
extreme case, the handler code might not be prepared for this case, and
could crash, or cause undesired side-effects?

So I am wondering, if we would actually need a config which by default
would keep this new feature disabled, and user would actively need to
change the config to enable it?

If we go this route, we could also immediately _deprecate_ this new
config (yes, it's a little odd to add a new config an deprecate it
right
away, but we have done this in the past), informing users that with
Kafka Streams 5.0, the handler will be invoked for "global Processor"
errors by default (and it's not possible to turn it off any longer).


Maybe I am too worried here, but based on experience, users do many
unexpected things, and to guard ourselves against surprises, it might
be
the safer option, to play it conservative?


Thoughts? -- Before you make any changes to the KIP, let's hear from
others. I guess I can also be convinces that the propose change is safe
enough as-is, and introducing a config would be overkill...



-Matthias



On 2/2/26 9:04 AM, Arpit Goyal wrote:
Hi Everyone
I initiated the voting thread for this KIP.

Thanks and Regards
Arpit Goyal
8861094754

On Tue, 27 Jan, 2026, 10:41 pm Arpit Goyal, <
[email protected]>
wrote:

Thanks everyone for the input. Should I start voting on it ?

Thanks and Regards
Arpit Goyal
8861094754

On Tue, 27 Jan, 2026, 2:10 pm Arpit Goyal, <
[email protected]>
wrote:

Thanks Matthias.
That makes sense. Client can use  the single handler
implementation to
support error handling for both Stream Thread and Global thread.
There
is
no need to introduce ThreadType parameter or another configuration
for
the
same.
@Lucas Brutschy <[email protected]>  It must answered your
query
?

Thanks and Regards
Arpit Goyal
8861094754


On Tue, Jan 27, 2026 at 11:43 AM Matthias J. Sax <[email protected]

wrote:

I don't think we would need multiple handlers. The handler is
invoked
passing in `ErrorHandlerContext` parameter, which provides enough
information to distinguish the case (eg, topic(),
processorNodeId(),
and
taskId()), so users can implement different logic inside the same
handler for the different cases if necessary.


-Matthias


On 1/23/26 10:05 AM, Arpit Goyal wrote:
Thanks Bill and Lucas for the feedback.

LB1:  I was wondering precisely what we are logging in the DLQ
case.
Do
            you intend to log the full record content to make he
record
content
            recoverable, or only some metadata. I suppose it's the
latter.


     >   Since GlobalKTable lacks producer infrastructure, DLQ
records
will be
logged with full metadata but NOT sent to a Kafka topic
LB2:   Maybe I missed it (also not super fluent in that part of
the
             code), but will the implementers of the
`ProcessExceptionalHandler` be
             able to tell whether the error originated from a
globalThread or a
            StreamsThread? The implementer may want to specialize
handling for
            each case. This is not a must, but would be a nice to
have
for
sure.

    >.   Great question! We have two options here
             Option 1: Single Handler Configuration


                    Users define one implementation of
ProcessingExceptionHandler that handles errors for all stream
types
(KStream, KTable, and GlobalKTable). This maintains
                     consistency with the existing
DeserializationExceptionHandler pattern.
            Limitation: This will enforce the same handling
behavior
for
global exception handling that we defined for KStream processing
exception
handling. This keeps things
                              simple but is not flexible enough
for
users who
may want different behavior for GlobalKTable.
              Option 2: Separate Optional Configuration


                              Introduce a new optional
configuration:
global.processing.exception.handler. If configured, it applies
specifically
to GlobalKTable processing errors; if not
                              configured, exceptions bubble up to
the
uncaught
exception handler (maintaining current behavior and backward
compatibility).
               Limitation: Requires two configuration properties
if
users want
exception handling for both regular streams and GlobalKTable.

              With Option 1 -  ProcessExceptionalHandler does not
have a
way
to identify which thread is invoking it as of now. We may need to
introduce
ThreadType(Stream or Global)  in errorHandlerContext with
ThreadType
information.
              With Option 2 - Client would always be aware of the
class
it
has  implemented for GlobalKTables.

Thanks and Regards
Arpit Goyal
8861094754


On Fri, Jan 23, 2026 at 9:43 PM Bill Bejeck <[email protected]>
wrote:

Hi All,

Thanks for the KIP! Makes sense to me and helps make KS more
robust.
I don't have any additional comments beyond what's been said so
far.

-Bill

On Fri, Jan 23, 2026 at 5:52 AM Lucas Brutschy via dev <
[email protected]>
wrote:

Hi,

Overall, this makes sense to me. Thanks for the KIP!

LB1: I was wondering precisely what we are logging in the DLQ
case.
Do
you intend to log the full record content to make he record
content
recoverable, or only some metadata. I suppose it's the latter.

LB2: Maybe I missed it (also not super fluent in that part of
the
code), but will the implementers of the
`ProcessExceptionalHandler`
be
able to tell whether the error originated from a globalThread
or a
StreamsThread? The implementer may want to specialize handling
for
each case. This is not a must, but would be a nice to have for
sure.

Cheers,
Lucas

On Thu, Jan 15, 2026 at 8:55 AM Arpit Goyal <
[email protected]>
wrote:

Hi All,
Looking for more inputs and feedback. This would help to move
this
KIP
forward.


Thanks and Regards
Arpit Goyal
8861094754

On Tue, 13 Jan, 2026, 2:17 pm Arpit Goyal, <
[email protected]>
wrote:

Thanks for the response Matthias.
I have updated the KIP to include KIP-1034 handleError()
automatic
backward compatibility. DLQ part I already mentioned under
the
Limitation
section. Let me know if it needs to be improved further.
Thanks and Regards
Arpit Goyal
8861094754


On Tue, Jan 13, 2026 at 5:05 AM Matthias J. Sax <
[email protected]>
wrote:

Thanks for the clarification. Make sense to me.

Might be good to add some of these details (no code
reference to
`ProcessorNode` etc necessary as it's impl detail) to the
KIP.
Ie,
state
explicitly that the new handleError() will be used and that
it
provides
backward compatibility automatically based on it's current
implementaion
from KIP-1034.

And that DLQ records, if returned, would be ignored and
dropped
and
a
warning is logged about it for this case.



-Matthias


On 1/12/26 2:29 AM, Arpit Goyal wrote:
Thank you for the detailed questions! Let me clarify the
implementation
approach:

      Which Method Will Be Called?

      GlobalThread will call the NEW handleError() method
(not
the
deprecated
handle()).

      Key Point: The exception handler is not called
directly by
GlobalThread.
Instead, it's called by ProcessorNode.process(), which
already
invokes
handleError() for regular processors.

      The implementation is straightforward:

      Current code (GlobalStateUpdateTask.initTopology -
line
161):
      node.init((InternalProcessorContext)
this.processorContext);
//
No
handler passed

      Proposed change:
      node.init((InternalProcessorContext)
this.processorContext,
processingExceptionHandler);  // Pass handler

      Once the handler is passed to ProcessorNode, the same
code
path
that
handles exceptions for regular KStream/KTable processors
(ProcessorNode.process() line 236) will automatically
handle
GlobalKTable
exceptions:

      Response response =
processingExceptionHandler.handleError(errorHandlerContext,
record,
exception);

      There's no separate code path for GlobalThread - it
reuses
the
existing
ProcessorNode exception handling mechanism.

      Backward Compatibility

      The handleError() method provides automatic backward
compatibility
via
its default implementation:

      default Response handleError(...) {
          return new Response(Result.from(handle(...)),
Collections.emptyList());
      }

      - If users implement the old handle() method:
handleError()
delegates to
it automatically
      - If users implement the new handleError() method:
it's
used
directly
      - No code changes required for existing applications

      Dead Letter Queue (DLQ) Support

      This is where GlobalKTable differs from regular
processors:

      The Limitation: GlobalThread does not have a
Producer, so
it
cannot
send
DLQ records to Kafka.

      Proposed Approach:

      1. For KIP-1270: When ProcessorNode detects DLQ
records
but
the
context
doesn't support RecordCollector (i.e., GlobalThread), it
will
log
a
warning
instead of sending:

      log.warn("Dead letter queue records cannot be sent for
GlobalKTable
processors " +
               "(no producer available). DLQ support for
GlobalKTable
will
be
addressed in a future KIP. " +
               "Record details logged: topic={},
headers={}",
...);

      2. Future KIP: Full DLQ support for GlobalKTable
(requires
adding
Producer infrastructure) will be proposed separately, as
it's a
larger
architectural change.

      How This Avoids User Confusion

      1. Single handler for all processors: Users configure
ONE
ProcessingExceptionHandler that works for both regular and
global
processors
      2. Consistent behavior: Result.RESUME continues,
Result.FAIL
stops -
same
for both
      3. Clear limitation: DLQ records are logged (not
sent) for
GlobalKTable,
with explicit warning message
      4. Documentation: Config docs will clearly state DLQ
sending
limitation
for GlobalKTable
Thanks and Regards
Arpit Goyal
8861094754


On Mon, Jan 12, 2026 at 7:40 AM Matthias J. Sax <
[email protected]

wrote:

Thanks for the KIP Arpit.

Can you elaborate a little bit more on details? With the
newly
added
DLQ
support for regular `Processor`, the existing
`ProcessingHandlerResponse` and corresponding method
`handle()`
are
deprecated with upcoming 4.2 release.

Thus, from AK 4.2+ going forward, users are expected to
not
implement
the old `handle()` (even if it's still supported, as long
as
the
new
`handleError` is not overwritten).

Are you proposing, for now, to only add support for the
deprecated
`handle()` method, ie, the new `handleError()` method
would
not
be
called by the global-thread code? If yes, this might be
confusing
for
users?

If you do not propose this, would it imply that the
global-thread
code
would call the new `handlerError()` method? For this
case, the
question
is what the runtime would do if users try to use the DLQ
feature?

Overall, it's unclear to me what you propose in detail
and how
we
can
avoid to confuse users, keep it backward compatible, and
make
it
easy
to
understanding how the handler will work.


-Matthias

On 1/10/26 8:33 AM, Arpit Goyal wrote:
Hi Team
Just reaching out again.Need your inputs to move it
forward

Thanks and Regards
Arpit Goyal
8861094754

On Thu, 8 Jan, 2026, 10:03 pm Arpit Goyal, <
[email protected]>
wrote:

Hi All,
I would like to start a discussion for KIP-1270.  This
KIP
extends
ProcessingExceptionHandler support to GlobalKTable
processors,
enabling
consistent exception handling across all stream
processing
types.

* Current Behavior*

       When a processing exception occurs in a
GlobalKTable
processor:
       - The exception propagates to GlobalStreamThread
       - The GlobalStreamThread terminates
       - The entire Kafka Streams application shuts down
       - No user-configurable exception handling is
available

*  Proposed Behavior*

       After this KIP, when a processing exception
occurs in
a
GlobalKTable
processor:
       - The configured
ProcessingExceptionHandler.handleError()
will be
invoked
       - If the handler returns Result.RESUME, processing
continues
with the
next record
       - If the handler returns Result.FAIL, the
exception
propagates
(same
as
current behavior)
       - If no handler is configured, behavior remains
unchanged
(backward
compatible)

KIP:







https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Introduce+ProcessExceptionalHandler+for+GlobalThread

Thanks and Regards
Arpit Goyal
8861094754


















Reply via email to