RE: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-08 Thread Sebastien Viale
Thanks for your review!

 All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

   The ProcessingContext is sufficient, we do expect that most people would 
need to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

 As it is a global error catching, the user may need little information about 
the faulty record.

 Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

 It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processor name to
input record types?

If you could provide an example of this new feature in the KIP, it would be
very helpful in understanding whether we could do something to make it
easier for users to use, for if it would be fine as-is

4. We should include all the relevant info for a new metric, such as the
metric
group and recording level. You can look at other metrics KIPs like KIP-444
and KIP-613 for an example. I suspect you intend for this to be in the
processor group and at the INFO level?

Hope that all makes sense! Thanks again for the KIP

-Sophie

On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina 
wrote:

> Hi everyone,
>
> After writing quite a few Kafka Streams applications, me and my colleagues
> just created KIP-1033 to introduce a new Exception Handler in Kafka Streams
> to simplify error handling.
> This feature would allow defining an exception handler to automatically
> catch exceptions occurring during the processing of a message.
>
> KIP link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing>
>
> Feedbacks and suggestions are welcome,
>
> Cheers,
> Damien, Sebastien and Loic
>

This email was screened for spam and malicious content but exercise caution 
anyway.




Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people would 
need to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

  As it is a global error catching, the user may need little information about 
the faulty record.

  Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

  It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi again,

I have additional questions/comments.

6.
What record is exactly passed to the handler?
Is it the input record to the task? Is it the input record to the 
processor node? Is it the input record to the processor?



7.
Could you please add the packages of the Java classes/interfaces/enums 
you want to add?



Best,
Bruno


On 4/9/24 10:17 AM, Bruno Cadonna wrote:

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface 
signature, it was not on our mind that the record be forwarded with 
the ProcessorContext.


    The ProcessingContext is sufficient, we do expect that most people 
would need to access the RecordMetadata.




3/ The use of Record is required, as the error could 
occurred in the middle of a processor where records could be non 
serializable objects


  As it is a global error catching, the user may need little 
information about the faulty record.


  Assuming that users want to make some specific treatments to the 
record, they can add a try / catch block in the topology.


  It is up to users to cast record value and key in the implementation 
of the ProcessorExceptionHandler.




Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing


Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Damien,

First off thanks for the KIP, this is definitely a much needed 
feature. On

the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct 
where

the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context 
of the

processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside 
whether

that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API 
here,
so I'll hold off until you clarify whether you even want forwarding or 
not.
We would also need to split the input record into a Record vs 
FixedKeyRecord


3. One notable

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bill Bejeck
Hi Damien, Sebastien and Loic,

Thanks for the KIP, this is a much-needed addition.
I like the approach of getting the plumbing in for handling processor
errors, allowing users to implement more complex solutions as needed.

Overall how where the KIP Is now LGTM, modulo outstanding comments.  I
think adding the example you included in this thread to the KIP is a great
idea.

Regarding the metrics, I'm thinking along the same lines as Bruno.  I'm
wondering if we can make do with a task-level metric at the INFO level and
the processor metric at DEBUG.  IMHO, when it comes to tracking exceptions
in processing, these two areas are where users will want to focus, higher
level metrics wouldn't be as useful in this case.

Thanks,
Bill

On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna  wrote:

> Hi again,
>
> I have additional questions/comments.
>
> 6.
> What record is exactly passed to the handler?
> Is it the input record to the task? Is it the input record to the
> processor node? Is it the input record to the processor?
>
>
> 7.
> Could you please add the packages of the Java classes/interfaces/enums
> you want to add?
>
>
> Best,
> Bruno
>
>
> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> > Hi Loïc, Damien, and Sébastien,
> >
> > Thanks for the KIP!
> > I find it really great that you contribute back to Kafka Streams
> > concepts you developed for kstreamplify so that everybody can take
> > advantage from your improvements.
> >
> > I have a couple of questions/comments:
> >
> > 1. and 2.
> > I am wondering whether we should expose the processor node ID -- which
> > basically is the processor node name -- in the ProcessingContext
> > interface. I think the processor node ID fits well in the
> > ProcessingContext interface since it already contains application ID and
> > task ID and it would make the API for the handler cleaner.
> >
> >
> > 3.
> > Could you elaborate -- maybe with an example -- when a record is in a
> > state in which it cannot be serialized? This is not completely clear to
> me.
> >
> >
> > 4.
> > Regarding the metrics, it is not entirely clear to me what the metric
> > measures. Is it the number of calls to the process handler or is it the
> > number of calls to process handler that returned FAIL?
> > If it is the former, I was also wondering whether it would be better to
> > put the task-level metrics to INFO reporting level and remove the
> > thread-level metric, similar to the dropped-records metric. You can
> > always roll-up the metrics to the thread level in your preferred
> > monitoring system. Or do you think we end up with to many metrics?
> >
> >
> > 5.
> > What do you think about naming the handler ProcessingExceptionHandler
> > instead of ProcessExceptionHandler?
> > The DeserializationExceptionHanlder and the ProductionExceptionHandler
> > also use the noun of the action in their name and not the verb.
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 4/8/24 3:48 PM, Sebastien Viale wrote:
> >> Thanks for your review!
> >>
> >>   All the points make sense for us!
> >>
> >>
> >>
> >> We updated the KIP for points 1 and 4.
> >>
> >>
> >>
> >> 2/ We followed the DeserializationExceptionHandler interface
> >> signature, it was not on our mind that the record be forwarded with
> >> the ProcessorContext.
> >>
> >> The ProcessingContext is sufficient, we do expect that most people
> >> would need to access the RecordMetadata.
> >>
> >>
> >>
> >> 3/ The use of Record is required, as the error could
> >> occurred in the middle of a processor where records could be non
> >> serializable objects
> >>
> >>   As it is a global error catching, the user may need little
> >> information about the faulty record.
> >>
> >>   Assuming that users want to make some specific treatments to the
> >> record, they can add a try / catch block in the topology.
> >>
> >>   It is up to users to cast record value and key in the implementation
> >> of the ProcessorExceptionHandler.
> >>
> >>
> >>
> >> Cheers
> >>
> >> Loïc, Damien and Sébastien
> >>
> >> 
> >> De : Sophie Blee-Goldman 
> >> Envoyé : samedi 6 avril 2024 01:08
> >> À : dev@kafka.apache.org 
> >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >> handler for exceptions o

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Damien Gasparina
D -- which
> > > basically is the processor node name -- in the ProcessingContext
> > > interface. I think the processor node ID fits well in the
> > > ProcessingContext interface since it already contains application ID
> and
> > > task ID and it would make the API for the handler cleaner.
> > >
> > >
> > > 3.
> > > Could you elaborate -- maybe with an example -- when a record is in a
> > > state in which it cannot be serialized? This is not completely clear to
> > me.
> > >
> > >
> > > 4.
> > > Regarding the metrics, it is not entirely clear to me what the metric
> > > measures. Is it the number of calls to the process handler or is it the
> > > number of calls to process handler that returned FAIL?
> > > If it is the former, I was also wondering whether it would be better to
> > > put the task-level metrics to INFO reporting level and remove the
> > > thread-level metric, similar to the dropped-records metric. You can
> > > always roll-up the metrics to the thread level in your preferred
> > > monitoring system. Or do you think we end up with to many metrics?
> > >
> > >
> > > 5.
> > > What do you think about naming the handler ProcessingExceptionHandler
> > > instead of ProcessExceptionHandler?
> > > The DeserializationExceptionHanlder and the ProductionExceptionHandler
> > > also use the noun of the action in their name and not the verb.
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > >
> > > On 4/8/24 3:48 PM, Sebastien Viale wrote:
> > >> Thanks for your review!
> > >>
> > >>   All the points make sense for us!
> > >>
> > >>
> > >>
> > >> We updated the KIP for points 1 and 4.
> > >>
> > >>
> > >>
> > >> 2/ We followed the DeserializationExceptionHandler interface
> > >> signature, it was not on our mind that the record be forwarded with
> > >> the ProcessorContext.
> > >>
> > >>     The ProcessingContext is sufficient, we do expect that most people
> > >> would need to access the RecordMetadata.
> > >>
> > >>
> > >>
> > >> 3/ The use of Record is required, as the error could
> > >> occurred in the middle of a processor where records could be non
> > >> serializable objects
> > >>
> > >>   As it is a global error catching, the user may need little
> > >> information about the faulty record.
> > >>
> > >>   Assuming that users want to make some specific treatments to the
> > >> record, they can add a try / catch block in the topology.
> > >>
> > >>   It is up to users to cast record value and key in the implementation
> > >> of the ProcessorExceptionHandler.
> > >>
> > >>
> > >>
> > >> Cheers
> > >>
> > >> Loïc, Damien and Sébastien
> > >>
> > >> 
> > >> De : Sophie Blee-Goldman 
> > >> Envoyé : samedi 6 avril 2024 01:08
> > >> À : dev@kafka.apache.org 
> > >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> > >> handler for exceptions occuring during processing
> > >>
> > >> Warning External sender Do not click on any links or open any
> > >> attachments unless you trust the sender and know the content is safe.
> > >>
> > >> Hi Damien,
> > >>
> > >> First off thanks for the KIP, this is definitely a much needed
> > >> feature. On
> > >> the
> > >> whole it seems pretty straightforward and I am in favor of the
> proposal.
> > >> Just
> > >> a few questions and suggestions here and there:
> > >>
> > >> 1. One of the #handle method's parameters is "ProcessorNode node", but
> > >> ProcessorNode is an internal class (and would expose a lot of
> internals
> > >> that we probably don't want to pass in to an exception handler). Would
> > it
> > >> be sufficient to just make this a String and pass in the processor
> name?
> > >>
> > >> 2. Another of the parameters in the ProcessorContext. This would
> enable
> > >> the handler to potentially forward records, which imo should not be
> done
> > >> from the handler since it could only ever ca