Thanks for your review!
All the points make sense for us!
We updated the KIP for points 1 and 4.
2/ We followed the DeserializationExceptionHandler interface
signature, it was not on our mind that the record be forwarded with
the ProcessorContext.
The ProcessingContext is sufficient, we do expect that most people
would need to access the RecordMetadata.
3/ The use of Record<Object, Object> is required, as the error could
occurred in the middle of a processor where records could be non
serializable objects
As it is a global error catching, the user may need little
information about the faulty record.
Assuming that users want to make some specific treatments to the
record, they can add a try / catch block in the topology.
It is up to users to cast record value and key in the implementation
of the ProcessorExceptionHandler.
Cheers
Loïc, Damien and Sébastien
________________________________
De : Sophie Blee-Goldman <sop...@responsive.dev>
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.
Hi Damien,
First off thanks for the KIP, this is definitely a much needed
feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:
1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?
2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct
where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context
of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside
whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API
here,
so I'll hold off until you clarify whether you even want forwarding or
not.
We would also need to split the input record into a Record vs
FixedKeyRecord
3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record<Object, Object>, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processor name to
input record types?
If you could provide an example of this new feature in the KIP, it
would be
very helpful in understanding whether we could do something to make it
easier for users to use, for if it would be fine as-is
4. We should include all the relevant info for a new metric, such as the
metric
group and recording level. You can look at other metrics KIPs like
KIP-444
and KIP-613 for an example. I suspect you intend for this to be in the
processor group and at the INFO level?
Hope that all makes sense! Thanks again for the KIP
-Sophie
On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <d.gaspar...@gmail.com>
wrote:
Hi everyone,
After writing quite a few Kafka Streams applications, me and my
colleagues
just created KIP-1033 to introduce a new Exception Handler in Kafka
Streams
to simplify error handling.
This feature would allow defining an exception handler to automatically
catch exceptions occurring during the processing of a message.
KIP link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing>
Feedbacks and suggestions are welcome,
Cheers,
Damien, Sebastien and Loic
This email was screened for spam and malicious content but exercise
caution anyway.