Thanks for all the updates. Great discussion.

Few follow up questions from my side:

99: I agree with Damien about Bruno's point (2). We should not return `Record` (cf point 103 below why).


100: Can we imagine a case, for which the `ProcessingExceptionHandler` would want/need to have access to a Processor's state stores? When the handler is executed, we are technically still in the right context, and could maybe allow accessing the state store. Just a wild idea; also don't want to enlarge the scope unnecessarily, and we might be able to do this in a follow up KIP, too, if we believe it would be useful. I though I just throw out the idea for completeness.


101: Does the name `ErrorHandlerContext` align to what Sophie had in mind about using this interface somewhere else?


102 `ErrorHandlerContext`: Is it intentional to have both `partition()` and `taskId()` -- the `TaskId` encodes the partition implicitly, so it's kinda redundant to also have `partition()`. Don't feel strongly about it, but might be worth to call out in the KIP why both are added.


103 `ErrorHandlerContext#header`: the return type is `Headers` which does not ensure immutability. I believe we need to introduce new `ReadOnlyHeaders` (or maybe some better name) interface...


104 `ErrorHandlerContext#convertToProcessorContext()`: I understand why this method was added, but I don't think that's the right approach to handle this case. We should not add this leaky abstraction IMHO, but instead add this method to a `DefaultImpl` class, and add a cast into the implementation from the interface to the class to access it. (Not 100% sure about the details how to setup the code, so it would be great to get a POC PR up to see how we can do this w/o the need to add this method to the interface.)


105 `ProductionExceptionHandler`: why does the existing method get a default implementation that throws an exception? Would be good to clarify in the KIP why this change in necessary in this way. -- Could we also let it `return FAIL` instead?


106 `ProductionExceptionHandler`: why do we add two new methods? IIRC, we added `handleSerializationException(...)` only because we could not re-use the existing `handle(...)` method (cf KIP-399: https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions). I think it's sufficient to only add a single new method which "blends/unifies" both exiting ones:

  handle(final ErrorHandlerContext context,
         final ProducerRecord<?, ?> record, // no generic types
         final Exception exception)


107 `DeserializationExceptionHandler`: same question as above, about the default impl and letting it throw an exception.


108 `default.process.exception.handler`: we use the prefix `default.` for both existing handlers, because we allow to pass in topic specific handlers via `Consumed` and `Produced` overwrites, ie, the default can be overwritten. We don't want to allow to pass in a Processor specific handler as pointed out in "Rejected Alternatives" section, and thus the configured handler is not really a "default" as it cannot be overwritten. For this case, we should drop the `default.` prefix in the config name.


109: Lucas brought up the idea on the KIP-1034 discussion to also include `TimestampExtractor` interface for DLQ, what I think makes a lot of sense. Not sure if we would need any extentions in this KIP to get this done? I would rather include timestamp extraction issue in the DLQ KIP from day one on. The interface is quite different though, so we would need to think a little bit about it in more details how to do this. Right now, the contract is that returning `-1` as extracted timestamp is an implicit "drop record" signal to the runtime, what is rather subtle. Can we do anything about this in a meaningful way?



-Matthias

On 4/22/24 8:20 AM, Damien Gasparina wrote:
Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext

Is there any reason you did not use something like
Record<byte[], byte[]> sourceRecord()

2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

 From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default implementation to ensure
backward compatibility while encouraging people to implement the new
method signature. If you know a better approach, I'll take :-)

Cheers,
Damien

On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna <cado...@apache.org> wrote:

Hi Damien,

Thanks a lot for the updates!

I have the following comments:

(1)
Could you rename ProcessingMetadata to ErrorHandlerContext or
ErrorHandlerMetadata (I am preferring the former)? I think it makes it
clearer for what this context/metadata is for.


(2)
Is there any reason you did not use something like

Record<byte[], byte[]> sourceRecord()

in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and
headers()? The headers() method refers to the record read from the input
topic of the sub-topology, right? If yes, maybe that is also something
to mention more explicitly.


(3)
Since you added the processor node ID to the ProcessingMetadata, you can
remove it from the signature of method handle() in
ProcessingExceptionHandler.


(4)
Where are the mentioned changes to the DeserializationExceptionHandler?


(5)
To be consistent, the order of the parameters in the
ProductionExceptionHandler should be:
1. context
2. record
3. exception


(6)
I am wondering where the implementation of ProcessingMetadata gets the
sourceRawKey/Value from. Do we need additional changes in
ProcessingContext and implementations?


Best,
Bruno


On 4/21/24 2:23 PM, Damien Gasparina wrote:
Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
    - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
    - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
    - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
     - Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman <sop...@responsive.dev> wrote:

Fully agree about creating a new class for the bits of ProcessingContext
that are specific to metadata only. In fact, more or less this same point
just came up in the related KIP 1034 for DLQs, since the RecordMetadata
can't always be trusted to remain immutable. Maybe it's possible to solve
both issues at once, with the same class?

On another related note -- I had actually also just proposed that we
deprecate the existing DeserializationExceptionHandler method and replace
it with one using the new PAPI as part of KIP-1034. But now that I'm
reading this, I would say it probably makes more sense to do in this KIP.
We can also push that out into a smaller-scoped third KIP if you want, but
clearly, there is some overlap here and so however you guys (the authors)
want to organize this part of the work is fine with me. I do think it
should be done alongside/before this KIP and 1034 though, for all the
reasons already stated.

Everything else in the discussion so far I agree with! The
ProcessingContext thing is the only open question in my mind

On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina <d.gaspar...@gmail.com>
wrote:

Hi Matthias, Bruno,

1.a During my previous comment, by Processor Node ID, I meant
Processor name. This is important information to expose in the handler
as it allows users to identify the location of the exception in the
topology.
I assume this information could be useful in other places, that's why
I would lean toward adding this as an attribute in the
ProcessingContext.

1.b Looking at the ProcessingContext, I do think the following 3
methods should not be accessible in the exception handlers:
getStateStore(), schedule() and commit().
Having a separate interface would make a cleaner signature. It would
also be a great time to ensure that all exception handlers are
consistent, at the moment, the
DeserializationExceptionHandler.handle() relies on the old PAPI
ProcessorContext and the ProductionExceptionHandler.handle() has none.
It could make sense to build the new interface in this KIP and track
the effort to migrate the existing handlers in a separate KIP, what do
you think?
Maybe I am overthinking this part and the ProcessingContext would be fine.

4. Good point regarding the dropped-record metric, as it is used by
the other handlers, I do think it makes sense to leverage it instead
of creating a new metric.
I will update the KIP to update the dropped-record-metric.

8. Regarding the DSL, I am aligned with Bruno, I think we could close
the gaps in a future KIP.

Cheers,
Damien


On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna <cado...@apache.org> wrote:

Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of
processor node metrics. That ID cannot be internal since it is exposed
in metrics. I think the processor name and the processor node ID is the
same thing. I followed how the processor node ID is set in metrics and I
ended up in addProcessor(name, ...).


1.b
Regarding ProcessingContext, I also thought about a separate class to
pass-in context information into the handler, but then I dismissed the
idea because I thought I was overthinking it. Apparently, I was not
overthinking it if you also had the same idea. So let's consider a
separate class.


4.
Regarding the metric, thanks for pointing to the dropped-record metric,
Matthias. The dropped-record metric is used with the deserialization
handler and the production handler. So, it would make sense to also use
it for this handler. However, the dropped-record metric only records
records that are skipped by the handler and not the number of calls to
the handler. But that difference is probably irrelevant since in case of
FAIL, the metric will be reset anyways since the stream thread will be
restarted. In conclusion, I think the dropped-record metric in
combination with a warn log message might be the better choice to
introducing a new metric.


8.
Regarding the DSL, I think we should close possible gaps in a separate
KIP.


Best,
Bruno

On 4/11/24 12:06 AM, Matthias J. Sax wrote:
Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the
processor node id? Isn't this internal (could not even find it
quickly).
We do have a processor name, right? Or do I mix up something?

Another question is about `ProcessingContext` -- it contains a lot of
(potentially irrelevant?) metadata. We should think carefully about
what
we want to pass in and what not -- removing stuff is hard, but adding
stuff is easy. It's always an option to create a new interface that
only
exposes stuff we find useful, and allows us to evolve this interface
independent of others. Re-using an existing interface always has the
danger to introduce an undesired coupling that could bite us in the
future. -- It make total sense to pass in `RecordMetadata`, but
`ProcessingContext` (even if already limited compared to
`ProcessorContext`) still seems to be too broad? For example, there is
`getStateStore()` and `schedule()` methods which I think we should not
expose.

The other interesting question is about "what record gets passed in".
For the PAPI, passing in the Processor's input record make a lot of
sense. However, for DSL operators, I am not 100% sure? The DSL often
uses internal types not exposed to the user, and thus I am not sure if
users could write useful code for this case? -- In general, I still
agree that the handler should be implement with a try-catch around
`Processor.process()` but it might not be too useful for DSL processor.
Hence, I am wondering if we need to so something more in the DSL? I
don't have a concrete proposal (a few high level ideas only) and if we
don't do anything special for the DSL I am ok with moving forward with
this KIP as-is, but we should be aware of potential limitations for DSL
users. We can always do a follow up KIP to close gaps when we
understand
the impact better -- covering the DSL would also expand the scope of
this KIP significantly...

About the metric: just to double check. Do we think it's worth to add a
new metric? Or could we re-use the existing "dropped record metric"?



-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:
Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

    Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand


------------------------------------------------------------------------
*De :* Bruno Cadonna <cado...@apache.org>
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org <dev@kafka.apache.org>
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record<Object, Object> because an Record<Object, Object> is passed to
the processor in the following code line:

https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
<
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152


I see that we do not need to pass into the the handler a
Record<byte[],
byte[]> just because we do that for the
DeserializationExceptionHandler
and the ProductionExceptionHandler. When those two handlers are
called,
the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record<?,
?>
for the record that is passed to the ProcessingExceptionHandler
because
it makes the handler API more flexible.


Best,
Bruno

This email was screened for spam and malicious content but exercise
caution anyway.




On 4/9/24 9:09 PM, Loic Greffier wrote:
   > Hi Bruno and Bill,
   >
   > To complete the Damien's purposes about the point 3.
   >
   > Processing errors are caught and handled by the
ProcessingErrorHandler, at the precise moment when records are
processed by processor nodes. The handling will be performed in the
"process" method of the ProcessorNode, such as:
   >
   > public void process(final Record<KIn, VIn> record) {
   > ...
   >
   > try {
   > ...
   > } catch (final ClassCastException e) {
   > ...
   > } catch (Exception e) {
   > ProcessingExceptionHandler.ProcessingHandlerResponse response =
this.processingExceptionHandler
   > .handle(internalProcessorContext, (Record<Object, Object>) record,
e);
   >
   > if (response ==
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
   > throw new StreamsException("Processing exception handler is set to
fail upon" +
   > " a processing error. If you would rather have the streaming
pipeline" +
   > " continue after a processing error, please set the " +
   > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "
appropriately.",
   > e);
   > }
   > }
   > }
   > As you can see, the record is transmitted to the
ProcessingExceptionHandler as a Record<Object,Object>, as we are
dealing with the input record of the processor at this point. It can
be any type, including non-serializable types, as suggested by the
Damien's example. As the ProcessingErrorHandler is not intended to
perform any serialization, there should be no issue for the users to
handle a Record<Object,Object>.
   >
   > I follow Damien on the other points.
   >
   > For point 6, underlying public interfaces are renamed as well:
   > - The ProcessingHandlerResponse
   > - The

ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
   > - The configuration
DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG
(default.processing.exception.handler)
   >
   > Regards,
   >
   > Loïc
   >
   > De : Damien Gasparina <d.gaspar...@gmail.com>
   > Envoyé : mardi 9 avril 2024 20:08
   > À : dev@kafka.apache.org
   > Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
exception handler for exceptions occuring during processing
   >
   > Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.
   >
   > Hi Bruno, Bill,
   >
   > First of all, thanks a lot for all your useful comments.
   >
   >> 1. and 2.
   >> I am wondering whether we should expose the processor node ID --
which
   >> basically is the processor node name -- in the ProcessingContext
   >> interface. I think the processor node ID fits well in the
   >> ProcessingContext interface since it already contains application
ID and
   >> task ID and it would make the API for the handler cleaner.
   >
   > That's a good point, the actual ProcessorContextImpl is already
holding the
   > current node in an attribute (currentNode), thus exposing the node
ID should
   > not be a problem. Let me sleep on it and get back to you regarding
this
   > point.
   >
   >> 3.
   >> Could you elaborate -- maybe with an example -- when a record is
in a
   >> state in which it cannot be serialized? This is not completely
clear to
   > me.
   >
   > The Record passed to the handler is the input record to the
processor. In
   > the Kafka Streams API, it could be any POJO.
   > e.g. with the following topology `
   > streamsBuilder.stream("x")
   > .map((k, v) -> new KeyValue("foo", Pair.of("hello",
   > "world")))
   > .forEach((k, v) -> throw new RuntimeException())
   > I would expect the handler to receive a Record<String, Pair<String,
   > String>>.
   >
   >> 4.
   >> Regarding the metrics, it is not entirely clear to me what the
metric
   >> measures. Is it the number of calls to the process handler or is
it the
   >> number of calls to process handler that returned FAIL?
   >> If it is the former, I was also wondering whether it would be
better to
   >> put the task-level metrics to INFO reporting level and remove the
   >> thread-level metric, similar to the dropped-records metric. You
can
   >> always roll-up the metrics to the thread level in your preferred
   >> monitoring system. Or do you think we end up with to many metrics?
   >
   > We were thinking of the former, measuring the number of calls to
the
   > process handler. That's a good point, having the information at the
task
   > level could be beneficial. I updated the KIP to change the metric
level
   > and to clarify the wording.
   >
   >> 5.
   >> What do you think about naming the handler
ProcessingExceptionHandler
   >> instead of ProcessExceptionHandler?
   >> The DeserializationExceptionHanlder and the
ProductionExceptionHandler
   >> also use the noun of the action in their name and not the verb.
   >
   > Good catch, I updated the KIP to rename it
ProcessingExceptionHandler.
   >
   >> 6.
   >> What record is exactly passed to the handler?
   >> Is it the input record to the task? Is it the input record to the
   >> processor node? Is it the input record to the processor?
   >
   > The input record of the processor. I assume that is the most user
   > friendly record in this context.
   >
   >> 7.
   >> Could you please add the packages of the Java
classes/interfaces/enums
   >> you want to add?
   >
   > Done, without any surprises: package
org.apache.kafka.streams.errors;
   >
   >
   > Thanks a lot for your reviews! Cheers,
   > Damien
   > This email was screened for spam and malicious content but exercise
caution anyway.
   >
   >
   >
   >
   > On Tue, 9 Apr 2024 at 18:04, Bill Bejeck
<bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote:
   >
   >> Hi Damien, Sebastien and Loic,
   >>
   >> Thanks for the KIP, this is a much-needed addition.
   >> I like the approach of getting the plumbing in for handling
processor
   >> errors, allowing users to implement more complex solutions as
needed.
   >>
   >> Overall how where the KIP Is now LGTM, modulo outstanding
comments. I
   >> think adding the example you included in this thread to the KIP is
a great
   >> idea.
   >>
   >> Regarding the metrics, I'm thinking along the same lines as Bruno.
I'm
   >> wondering if we can make do with a task-level metric at the INFO
level and
   >> the processor metric at DEBUG. IMHO, when it comes to tracking
exceptions
   >> in processing, these two areas are where users will want to focus,
higher
   >> level metrics wouldn't be as useful in this case.
   >>
   >> Thanks,
   >> Bill
   >>
   >> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna
<cado...@apache.org<mailto:cado...@apache.org>> wrote:
   >>
   >>> Hi again,
   >>>
   >>> I have additional questions/comments.
   >>>
   >>> 6.
   >>> What record is exactly passed to the handler?
   >>> Is it the input record to the task? Is it the input record to the
   >>> processor node? Is it the input record to the processor?
   >>>
   >>>
   >>> 7.
   >>> Could you please add the packages of the Java
classes/interfaces/enums
   >>> you want to add?
   >>>
   >>>
   >>> Best,
   >>> Bruno
   >>>
   >>>
   >>> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
   >>>> Hi Loïc, Damien, and Sébastien,
   >>>>
   >>>> Thanks for the KIP!
   >>>> I find it really great that you contribute back to Kafka Streams
   >>>> concepts you developed for kstreamplify so that everybody can
take
   >>>> advantage from your improvements.
   >>>>
   >>>> I have a couple of questions/comments:
   >>>>
   >>>> 1. and 2.
   >>>> I am wondering whether we should expose the processor node ID --
which
   >>>> basically is the processor node name -- in the ProcessingContext
   >>>> interface. I think the processor node ID fits well in the
   >>>> ProcessingContext interface since it already contains
application ID
   >> and
   >>>> task ID and it would make the API for the handler cleaner.
   >>>>
   >>>>
   >>>> 3.
   >>>> Could you elaborate -- maybe with an example -- when a record is
in a
   >>>> state in which it cannot be serialized? This is not completely
clear to
   >>> me.
   >>>>
   >>>>
   >>>> 4.
   >>>> Regarding the metrics, it is not entirely clear to me what the
metric
   >>>> measures. Is it the number of calls to the process handler or is
it the
   >>>> number of calls to process handler that returned FAIL?
   >>>> If it is the former, I was also wondering whether it would be
better to
   >>>> put the task-level metrics to INFO reporting level and remove
the
   >>>> thread-level metric, similar to the dropped-records metric. You
can
   >>>> always roll-up the metrics to the thread level in your preferred
   >>>> monitoring system. Or do you think we end up with to many
metrics?
   >>>>
   >>>>
   >>>> 5.
   >>>> What do you think about naming the handler
ProcessingExceptionHandler
   >>>> instead of ProcessExceptionHandler?
   >>>> The DeserializationExceptionHanlder and the
ProductionExceptionHandler
   >>>> also use the noun of the action in their name and not the verb.
   >>>>
   >>>>
   >>>> Best,
   >>>> Bruno
   >>>>
   >>>>
   >>>> On 4/8/24 3:48 PM, Sebastien Viale wrote:
   >>>>> Thanks for your review!
   >>>>>
   >>>>> All the points make sense for us!
   >>>>>
   >>>>>
   >>>>>
   >>>>> We updated the KIP for points 1 and 4.
   >>>>>
   >>>>>
   >>>>>
   >>>>> 2/ We followed the DeserializationExceptionHandler interface
   >>>>> signature, it was not on our mind that the record be forwarded
with
   >>>>> the ProcessorContext.
   >>>>>
   >>>>> The ProcessingContext is sufficient, we do expect that most
people
   >>>>> would need to access the RecordMetadata.
   >>>>>
   >>>>>
   >>>>>
   >>>>> 3/ The use of Record<Object, Object> is required, as the error
could
   >>>>> occurred in the middle of a processor where records could be
non
   >>>>> serializable objects
   >>>>>
   >>>>> As it is a global error catching, the user may need little
   >>>>> information about the faulty record.
   >>>>>
   >>>>> Assuming that users want to make some specific treatments to
the
   >>>>> record, they can add a try / catch block in the topology.
   >>>>>
   >>>>> It is up to users to cast record value and key in the
implementation
   >>>>> of the ProcessorExceptionHandler.
   >>>>>
   >>>>>
   >>>>>
   >>>>> Cheers
   >>>>>
   >>>>> Loïc, Damien and Sébastien
   >>>>>
   >>>>> ________________________________
   >>>>> De : Sophie Blee-Goldman
<sop...@responsive.dev<mailto:sop...@responsive.dev>>
   >>>>> Envoyé : samedi 6 avril 2024 01:08
   >>>>> À : dev@kafka.apache.org<mailto:dev@kafka.apache.org>
<dev@kafka.apache.org<mailto:dev@kafka.apache.org>>
   >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
exception
   >>>>> handler for exceptions occuring during processing
   >>>>>
   >>>>> Warning External sender Do not click on any links or open any
   >>>>> attachments unless you trust the sender and know the content is
safe.
   >>>>>
   >>>>> Hi Damien,
   >>>>>
   >>>>> First off thanks for the KIP, this is definitely a much needed
   >>>>> feature. On
   >>>>> the
   >>>>> whole it seems pretty straightforward and I am in favor of the
   >> proposal.
   >>>>> Just
   >>>>> a few questions and suggestions here and there:
   >>>>>
   >>>>> 1. One of the #handle method's parameters is "ProcessorNode
node", but
   >>>>> ProcessorNode is an internal class (and would expose a lot of
   >> internals
   >>>>> that we probably don't want to pass in to an exception
handler). Would
   >>> it
   >>>>> be sufficient to just make this a String and pass in the
processor
   >> name?
   >>>>>
   >>>>> 2. Another of the parameters in the ProcessorContext. This
would
   >> enable
   >>>>> the handler to potentially forward records, which imo should
not be
   >> done
   >>>>> from the handler since it could only ever call #forward but not
direct
   >>>>> where
   >>>>> the record is actually forwarded to, and could cause confusion
if
   >> users
   >>>>> aren't aware that the handler is effectively calling from the
context
   >>>>> of the
   >>>>> processor that threw the exception.
   >>>>> 2a. If you don't explicitly want the ability to forward
records, I
   >> would
   >>>>> suggest changing the type of this parameter to
ProcessingContext,
   >> which
   >>>>> has all the metadata and useful info of the ProcessorContext
but
   >> without
   >>>>> the
   >>>>> forwarding APIs. This would also lets us sidestep the following
issue:
   >>>>> 2b. If you *do* want the ability to forward records, setting
aside
   >>>>> whether
   >>>>> that
   >>>>> in of itself makes sense to do, we would need to pass in
either a
   >>> regular
   >>>>> ProcessorContext or a FixedKeyProcessorContext, depending on
what kind
   >>>>> of processor it is. I'm not quite sure how we could design a
clean API
   >>>>> here,
   >>>>> so I'll hold off until you clarify whether you even want
forwarding or
   >>>>> not.
   >>>>> We would also need to split the input record into a Record vs
   >>>>> FixedKeyRecord
   >>>>>
   >>>>> 3. One notable difference between this handler and the existing
ones
   >> you
   >>>>> pointed out, the Deserialization/ProductionExceptionHandler, is
that
   >> the
   >>>>> records passed in to those are in serialized bytes, whereas the
record
   >>>>> here would be POJOs. You account for this by making the
parameter
   >>>>> type a Record<Object, Object>, but I just wonder how users
would be
   >>>>> able to read the key/value and figure out what type it should
be. For
   >>>>> example, would they need to maintain a map from processor name
to
   >>>>> input record types?
   >>>>>
   >>>>> If you could provide an example of this new feature in the
KIP, it
   >>>>> would be
   >>>>> very helpful in understanding whether we could do something to
make it
   >>>>> easier for users to use, for if it would be fine as-is
   >>>>>
   >>>>> 4. We should include all the relevant info for a new metric,
such as
   >> the
   >>>>> metric
   >>>>> group and recording level. You can look at other metrics KIPs
like
   >>>>> KIP-444
   >>>>> and KIP-613 for an example. I suspect you intend for this to be
in the
   >>>>> processor group and at the INFO level?
   >>>>>
   >>>>> Hope that all makes sense! Thanks again for the KIP
   >>>>>
   >>>>> -Sophie
   >>>>>
   >>>>> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <
   >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
   >>>>
   >>>>> wrote:
   >>>>>
   >>>>>> Hi everyone,
   >>>>>>
   >>>>>> After writing quite a few Kafka Streams applications, me and
my
   >>>>>> colleagues
   >>>>>> just created KIP-1033 to introduce a new Exception Handler in
Kafka
   >>>>>> Streams
   >>>>>> to simplify error handling.
   >>>>>> This feature would allow defining an exception handler to
   >> automatically
   >>>>>> catch exceptions occurring during the processing of a message.
   >>>>>>
   >>>>>> KIP link:
   >>>>>>
   >>>>>>
   >>>
   >>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing

   >>> <
   >>>
   >>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing

   >>>>
   >>>>>>
   >>>>>> Feedbacks and suggestions are welcome,
   >>>>>>
   >>>>>> Cheers,
   >>>>>> Damien, Sebastien and Loic
   >>>>>>
   >>>>>
   >>>>> This email was screened for spam and malicious content but
exercise
   >>>>> caution anyway.
   >>>>>
   >>>>>
   >>>
   >>


Reply via email to