Fully agree about creating a new class for the bits of ProcessingContext
that are specific to metadata only. In fact, more or less this same point
just came up in the related KIP 1034 for DLQs, since the RecordMetadata
can't always be trusted to remain immutable. Maybe it's possible to solve
both issues at once, with the same class?

On another related note -- I had actually also just proposed that we
deprecate the existing DeserializationExceptionHandler method and replace
it with one using the new PAPI as part of KIP-1034. But now that I'm
reading this, I would say it probably makes more sense to do in this KIP.
We can also push that out into a smaller-scoped third KIP if you want, but
clearly, there is some overlap here and so however you guys (the authors)
want to organize this part of the work is fine with me. I do think it
should be done alongside/before this KIP and 1034 though, for all the
reasons already stated.

Everything else in the discussion so far I agree with! The
ProcessingContext thing is the only open question in my mind

On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina <d.gaspar...@gmail.com>
wrote:

> Hi Matthias, Bruno,
>
> 1.a During my previous comment, by Processor Node ID, I meant
> Processor name. This is important information to expose in the handler
> as it allows users to identify the location of the exception in the
> topology.
> I assume this information could be useful in other places, that's why
> I would lean toward adding this as an attribute in the
> ProcessingContext.
>
> 1.b Looking at the ProcessingContext, I do think the following 3
> methods should not be accessible in the exception handlers:
> getStateStore(), schedule() and commit().
> Having a separate interface would make a cleaner signature. It would
> also be a great time to ensure that all exception handlers are
> consistent, at the moment, the
> DeserializationExceptionHandler.handle() relies on the old PAPI
> ProcessorContext and the ProductionExceptionHandler.handle() has none.
> It could make sense to build the new interface in this KIP and track
> the effort to migrate the existing handlers in a separate KIP, what do
> you think?
> Maybe I am overthinking this part and the ProcessingContext would be fine.
>
> 4. Good point regarding the dropped-record metric, as it is used by
> the other handlers, I do think it makes sense to leverage it instead
> of creating a new metric.
> I will update the KIP to update the dropped-record-metric.
>
> 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> the gaps in a future KIP.
>
> Cheers,
> Damien
>
>
> On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna <cado...@apache.org> wrote:
> >
> > Hi Matthias,
> >
> >
> > 1.a
> > With processor node ID, I mean the ID that is exposed in the tags of
> > processor node metrics. That ID cannot be internal since it is exposed
> > in metrics. I think the processor name and the processor node ID is the
> > same thing. I followed how the processor node ID is set in metrics and I
> > ended up in addProcessor(name, ...).
> >
> >
> > 1.b
> > Regarding ProcessingContext, I also thought about a separate class to
> > pass-in context information into the handler, but then I dismissed the
> > idea because I thought I was overthinking it. Apparently, I was not
> > overthinking it if you also had the same idea. So let's consider a
> > separate class.
> >
> >
> > 4.
> > Regarding the metric, thanks for pointing to the dropped-record metric,
> > Matthias. The dropped-record metric is used with the deserialization
> > handler and the production handler. So, it would make sense to also use
> > it for this handler. However, the dropped-record metric only records
> > records that are skipped by the handler and not the number of calls to
> > the handler. But that difference is probably irrelevant since in case of
> > FAIL, the metric will be reset anyways since the stream thread will be
> > restarted. In conclusion, I think the dropped-record metric in
> > combination with a warn log message might be the better choice to
> > introducing a new metric.
> >
> >
> > 8.
> > Regarding the DSL, I think we should close possible gaps in a separate
> KIP.
> >
> >
> > Best,
> > Bruno
> >
> > On 4/11/24 12:06 AM, Matthias J. Sax wrote:
> > > Thanks for the KIP. Great discussion.
> > >
> > > I am not sure if I understand the proposal from Bruno to hand in the
> > > processor node id? Isn't this internal (could not even find it
> quickly).
> > > We do have a processor name, right? Or do I mix up something?
> > >
> > > Another question is about `ProcessingContext` -- it contains a lot of
> > > (potentially irrelevant?) metadata. We should think carefully about
> what
> > > we want to pass in and what not -- removing stuff is hard, but adding
> > > stuff is easy. It's always an option to create a new interface that
> only
> > > exposes stuff we find useful, and allows us to evolve this interface
> > > independent of others. Re-using an existing interface always has the
> > > danger to introduce an undesired coupling that could bite us in the
> > > future. -- It make total sense to pass in `RecordMetadata`, but
> > > `ProcessingContext` (even if already limited compared to
> > > `ProcessorContext`) still seems to be too broad? For example, there is
> > > `getStateStore()` and `schedule()` methods which I think we should not
> > > expose.
> > >
> > > The other interesting question is about "what record gets passed in".
> > > For the PAPI, passing in the Processor's input record make a lot of
> > > sense. However, for DSL operators, I am not 100% sure? The DSL often
> > > uses internal types not exposed to the user, and thus I am not sure if
> > > users could write useful code for this case? -- In general, I still
> > > agree that the handler should be implement with a try-catch around
> > > `Processor.process()` but it might not be too useful for DSL processor.
> > > Hence, I am wondering if we need to so something more in the DSL? I
> > > don't have a concrete proposal (a few high level ideas only) and if we
> > > don't do anything special for the DSL I am ok with moving forward with
> > > this KIP as-is, but we should be aware of potential limitations for DSL
> > > users. We can always do a follow up KIP to close gaps when we
> understand
> > > the impact better -- covering the DSL would also expand the scope of
> > > this KIP significantly...
> > >
> > > About the metric: just to double check. Do we think it's worth to add a
> > > new metric? Or could we re-use the existing "dropped record metric"?
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/10/24 5:11 AM, Sebastien Viale wrote:
> > >> Hi,
> > >>
> > >> You are right, it will simplify types.
> > >>
> > >> We update the KIP
> > >>
> > >> regards
> > >>
> > >> Sébastien *VIALE***
> > >>
> > >> *MICHELIN GROUP* - InfORMATION Technology
> > >> *Technical Expert Kafka*
> > >>
> > >>   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> > >>
> > >>
> ------------------------------------------------------------------------
> > >> *De :* Bruno Cadonna <cado...@apache.org>
> > >> *Envoyé :* mercredi 10 avril 2024 10:38
> > >> *À :* dev@kafka.apache.org <dev@kafka.apache.org>
> > >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> > >> handler for exceptions occuring during processing
> > >> Warning External sender Do not click on any links or open any
> > >> attachments unless you trust the sender and know the content is safe.
> > >>
> > >> Hi Loïc, Damien, and Sébastien,
> > >>
> > >> Great that we are converging!
> > >>
> > >>
> > >> 3.
> > >> Damien and Loïc, I think in your examples the handler will receive
> > >> Record<Object, Object> because an Record<Object, Object> is passed to
> > >> the processor in the following code line:
> > >>
> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> <
> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> >
> > >>
> > >> I see that we do not need to pass into the the handler a
> Record<byte[],
> > >> byte[]> just because we do that for the
> DeserializationExceptionHandler
> > >> and the ProductionExceptionHandler. When those two handlers are
> called,
> > >> the record is already serialized. This is not the case for the
> > >> ProcessingExceptionHandler. However, I would propose to use Record<?,
> ?>
> > >> for the record that is passed to the ProcessingExceptionHandler
> because
> > >> it makes the handler API more flexible.
> > >>
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> This email was screened for spam and malicious content but exercise
> > >> caution anyway.
> > >>
> > >>
> > >>
> > >>
> > >> On 4/9/24 9:09 PM, Loic Greffier wrote:
> > >>  > Hi Bruno and Bill,
> > >>  >
> > >>  > To complete the Damien's purposes about the point 3.
> > >>  >
> > >>  > Processing errors are caught and handled by the
> > >> ProcessingErrorHandler, at the precise moment when records are
> > >> processed by processor nodes. The handling will be performed in the
> > >> "process" method of the ProcessorNode, such as:
> > >>  >
> > >>  > public void process(final Record<KIn, VIn> record) {
> > >>  > ...
> > >>  >
> > >>  > try {
> > >>  > ...
> > >>  > } catch (final ClassCastException e) {
> > >>  > ...
> > >>  > } catch (Exception e) {
> > >>  > ProcessingExceptionHandler.ProcessingHandlerResponse response =
> > >> this.processingExceptionHandler
> > >>  > .handle(internalProcessorContext, (Record<Object, Object>) record,
> e);
> > >>  >
> > >>  > if (response ==
> > >> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
> > >>  > throw new StreamsException("Processing exception handler is set to
> > >> fail upon" +
> > >>  > " a processing error. If you would rather have the streaming
> > >> pipeline" +
> > >>  > " continue after a processing error, please set the " +
> > >>  > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "
> appropriately.",
> > >>  > e);
> > >>  > }
> > >>  > }
> > >>  > }
> > >>  > As you can see, the record is transmitted to the
> > >> ProcessingExceptionHandler as a Record<Object,Object>, as we are
> > >> dealing with the input record of the processor at this point. It can
> > >> be any type, including non-serializable types, as suggested by the
> > >> Damien's example. As the ProcessingErrorHandler is not intended to
> > >> perform any serialization, there should be no issue for the users to
> > >> handle a Record<Object,Object>.
> > >>  >
> > >>  > I follow Damien on the other points.
> > >>  >
> > >>  > For point 6, underlying public interfaces are renamed as well:
> > >>  > - The ProcessingHandlerResponse
> > >>  > - The
> > >>
> ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
> > >>  > - The configuration
> > >> DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG
> > >> (default.processing.exception.handler)
> > >>  >
> > >>  > Regards,
> > >>  >
> > >>  > Loïc
> > >>  >
> > >>  > De : Damien Gasparina <d.gaspar...@gmail.com>
> > >>  > Envoyé : mardi 9 avril 2024 20:08
> > >>  > À : dev@kafka.apache.org
> > >>  > Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
> > >> exception handler for exceptions occuring during processing
> > >>  >
> > >>  > Warning External sender Do not click on any links or open any
> > >> attachments unless you trust the sender and know the content is safe.
> > >>  >
> > >>  > Hi Bruno, Bill,
> > >>  >
> > >>  > First of all, thanks a lot for all your useful comments.
> > >>  >
> > >>  >> 1. and 2.
> > >>  >> I am wondering whether we should expose the processor node ID --
> > >> which
> > >>  >> basically is the processor node name -- in the ProcessingContext
> > >>  >> interface. I think the processor node ID fits well in the
> > >>  >> ProcessingContext interface since it already contains application
> > >> ID and
> > >>  >> task ID and it would make the API for the handler cleaner.
> > >>  >
> > >>  > That's a good point, the actual ProcessorContextImpl is already
> > >> holding the
> > >>  > current node in an attribute (currentNode), thus exposing the node
> > >> ID should
> > >>  > not be a problem. Let me sleep on it and get back to you regarding
> > >> this
> > >>  > point.
> > >>  >
> > >>  >> 3.
> > >>  >> Could you elaborate -- maybe with an example -- when a record is
> in a
> > >>  >> state in which it cannot be serialized? This is not completely
> > >> clear to
> > >>  > me.
> > >>  >
> > >>  > The Record passed to the handler is the input record to the
> > >> processor. In
> > >>  > the Kafka Streams API, it could be any POJO.
> > >>  > e.g. with the following topology `
> > >>  > streamsBuilder.stream("x")
> > >>  > .map((k, v) -> new KeyValue("foo", Pair.of("hello",
> > >>  > "world")))
> > >>  > .forEach((k, v) -> throw new RuntimeException())
> > >>  > I would expect the handler to receive a Record<String, Pair<String,
> > >>  > String>>.
> > >>  >
> > >>  >> 4.
> > >>  >> Regarding the metrics, it is not entirely clear to me what the
> metric
> > >>  >> measures. Is it the number of calls to the process handler or is
> > >> it the
> > >>  >> number of calls to process handler that returned FAIL?
> > >>  >> If it is the former, I was also wondering whether it would be
> > >> better to
> > >>  >> put the task-level metrics to INFO reporting level and remove the
> > >>  >> thread-level metric, similar to the dropped-records metric. You
> can
> > >>  >> always roll-up the metrics to the thread level in your preferred
> > >>  >> monitoring system. Or do you think we end up with to many metrics?
> > >>  >
> > >>  > We were thinking of the former, measuring the number of calls to
> the
> > >>  > process handler. That's a good point, having the information at the
> > >> task
> > >>  > level could be beneficial. I updated the KIP to change the metric
> > >> level
> > >>  > and to clarify the wording.
> > >>  >
> > >>  >> 5.
> > >>  >> What do you think about naming the handler
> ProcessingExceptionHandler
> > >>  >> instead of ProcessExceptionHandler?
> > >>  >> The DeserializationExceptionHanlder and the
> > >> ProductionExceptionHandler
> > >>  >> also use the noun of the action in their name and not the verb.
> > >>  >
> > >>  > Good catch, I updated the KIP to rename it
> ProcessingExceptionHandler.
> > >>  >
> > >>  >> 6.
> > >>  >> What record is exactly passed to the handler?
> > >>  >> Is it the input record to the task? Is it the input record to the
> > >>  >> processor node? Is it the input record to the processor?
> > >>  >
> > >>  > The input record of the processor. I assume that is the most user
> > >>  > friendly record in this context.
> > >>  >
> > >>  >> 7.
> > >>  >> Could you please add the packages of the Java
> > >> classes/interfaces/enums
> > >>  >> you want to add?
> > >>  >
> > >>  > Done, without any surprises: package
> org.apache.kafka.streams.errors;
> > >>  >
> > >>  >
> > >>  > Thanks a lot for your reviews! Cheers,
> > >>  > Damien
> > >>  > This email was screened for spam and malicious content but exercise
> > >> caution anyway.
> > >>  >
> > >>  >
> > >>  >
> > >>  >
> > >>  > On Tue, 9 Apr 2024 at 18:04, Bill Bejeck
> > >> <bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote:
> > >>  >
> > >>  >> Hi Damien, Sebastien and Loic,
> > >>  >>
> > >>  >> Thanks for the KIP, this is a much-needed addition.
> > >>  >> I like the approach of getting the plumbing in for handling
> processor
> > >>  >> errors, allowing users to implement more complex solutions as
> needed.
> > >>  >>
> > >>  >> Overall how where the KIP Is now LGTM, modulo outstanding
> comments. I
> > >>  >> think adding the example you included in this thread to the KIP is
> > >> a great
> > >>  >> idea.
> > >>  >>
> > >>  >> Regarding the metrics, I'm thinking along the same lines as Bruno.
> > >> I'm
> > >>  >> wondering if we can make do with a task-level metric at the INFO
> > >> level and
> > >>  >> the processor metric at DEBUG. IMHO, when it comes to tracking
> > >> exceptions
> > >>  >> in processing, these two areas are where users will want to focus,
> > >> higher
> > >>  >> level metrics wouldn't be as useful in this case.
> > >>  >>
> > >>  >> Thanks,
> > >>  >> Bill
> > >>  >>
> > >>  >> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna
> > >> <cado...@apache.org<mailto:cado...@apache.org>> wrote:
> > >>  >>
> > >>  >>> Hi again,
> > >>  >>>
> > >>  >>> I have additional questions/comments.
> > >>  >>>
> > >>  >>> 6.
> > >>  >>> What record is exactly passed to the handler?
> > >>  >>> Is it the input record to the task? Is it the input record to the
> > >>  >>> processor node? Is it the input record to the processor?
> > >>  >>>
> > >>  >>>
> > >>  >>> 7.
> > >>  >>> Could you please add the packages of the Java
> > >> classes/interfaces/enums
> > >>  >>> you want to add?
> > >>  >>>
> > >>  >>>
> > >>  >>> Best,
> > >>  >>> Bruno
> > >>  >>>
> > >>  >>>
> > >>  >>> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> > >>  >>>> Hi Loïc, Damien, and Sébastien,
> > >>  >>>>
> > >>  >>>> Thanks for the KIP!
> > >>  >>>> I find it really great that you contribute back to Kafka Streams
> > >>  >>>> concepts you developed for kstreamplify so that everybody can
> take
> > >>  >>>> advantage from your improvements.
> > >>  >>>>
> > >>  >>>> I have a couple of questions/comments:
> > >>  >>>>
> > >>  >>>> 1. and 2.
> > >>  >>>> I am wondering whether we should expose the processor node ID --
> > >> which
> > >>  >>>> basically is the processor node name -- in the ProcessingContext
> > >>  >>>> interface. I think the processor node ID fits well in the
> > >>  >>>> ProcessingContext interface since it already contains
> > >> application ID
> > >>  >> and
> > >>  >>>> task ID and it would make the API for the handler cleaner.
> > >>  >>>>
> > >>  >>>>
> > >>  >>>> 3.
> > >>  >>>> Could you elaborate -- maybe with an example -- when a record is
> > >> in a
> > >>  >>>> state in which it cannot be serialized? This is not completely
> > >> clear to
> > >>  >>> me.
> > >>  >>>>
> > >>  >>>>
> > >>  >>>> 4.
> > >>  >>>> Regarding the metrics, it is not entirely clear to me what the
> > >> metric
> > >>  >>>> measures. Is it the number of calls to the process handler or is
> > >> it the
> > >>  >>>> number of calls to process handler that returned FAIL?
> > >>  >>>> If it is the former, I was also wondering whether it would be
> > >> better to
> > >>  >>>> put the task-level metrics to INFO reporting level and remove
> the
> > >>  >>>> thread-level metric, similar to the dropped-records metric. You
> can
> > >>  >>>> always roll-up the metrics to the thread level in your preferred
> > >>  >>>> monitoring system. Or do you think we end up with to many
> metrics?
> > >>  >>>>
> > >>  >>>>
> > >>  >>>> 5.
> > >>  >>>> What do you think about naming the handler
> > >> ProcessingExceptionHandler
> > >>  >>>> instead of ProcessExceptionHandler?
> > >>  >>>> The DeserializationExceptionHanlder and the
> > >> ProductionExceptionHandler
> > >>  >>>> also use the noun of the action in their name and not the verb.
> > >>  >>>>
> > >>  >>>>
> > >>  >>>> Best,
> > >>  >>>> Bruno
> > >>  >>>>
> > >>  >>>>
> > >>  >>>> On 4/8/24 3:48 PM, Sebastien Viale wrote:
> > >>  >>>>> Thanks for your review!
> > >>  >>>>>
> > >>  >>>>> All the points make sense for us!
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>> We updated the KIP for points 1 and 4.
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>> 2/ We followed the DeserializationExceptionHandler interface
> > >>  >>>>> signature, it was not on our mind that the record be forwarded
> > >> with
> > >>  >>>>> the ProcessorContext.
> > >>  >>>>>
> > >>  >>>>> The ProcessingContext is sufficient, we do expect that most
> people
> > >>  >>>>> would need to access the RecordMetadata.
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>> 3/ The use of Record<Object, Object> is required, as the error
> > >> could
> > >>  >>>>> occurred in the middle of a processor where records could be
> non
> > >>  >>>>> serializable objects
> > >>  >>>>>
> > >>  >>>>> As it is a global error catching, the user may need little
> > >>  >>>>> information about the faulty record.
> > >>  >>>>>
> > >>  >>>>> Assuming that users want to make some specific treatments to
> the
> > >>  >>>>> record, they can add a try / catch block in the topology.
> > >>  >>>>>
> > >>  >>>>> It is up to users to cast record value and key in the
> > >> implementation
> > >>  >>>>> of the ProcessorExceptionHandler.
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>>> Cheers
> > >>  >>>>>
> > >>  >>>>> Loïc, Damien and Sébastien
> > >>  >>>>>
> > >>  >>>>> ________________________________
> > >>  >>>>> De : Sophie Blee-Goldman
> > >> <sop...@responsive.dev<mailto:sop...@responsive.dev>>
> > >>  >>>>> Envoyé : samedi 6 avril 2024 01:08
> > >>  >>>>> À : dev@kafka.apache.org<mailto:dev@kafka.apache.org>
> > >> <dev@kafka.apache.org<mailto:dev@kafka.apache.org>>
> > >>  >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
> exception
> > >>  >>>>> handler for exceptions occuring during processing
> > >>  >>>>>
> > >>  >>>>> Warning External sender Do not click on any links or open any
> > >>  >>>>> attachments unless you trust the sender and know the content is
> > >> safe.
> > >>  >>>>>
> > >>  >>>>> Hi Damien,
> > >>  >>>>>
> > >>  >>>>> First off thanks for the KIP, this is definitely a much needed
> > >>  >>>>> feature. On
> > >>  >>>>> the
> > >>  >>>>> whole it seems pretty straightforward and I am in favor of the
> > >>  >> proposal.
> > >>  >>>>> Just
> > >>  >>>>> a few questions and suggestions here and there:
> > >>  >>>>>
> > >>  >>>>> 1. One of the #handle method's parameters is "ProcessorNode
> > >> node", but
> > >>  >>>>> ProcessorNode is an internal class (and would expose a lot of
> > >>  >> internals
> > >>  >>>>> that we probably don't want to pass in to an exception
> > >> handler). Would
> > >>  >>> it
> > >>  >>>>> be sufficient to just make this a String and pass in the
> processor
> > >>  >> name?
> > >>  >>>>>
> > >>  >>>>> 2. Another of the parameters in the ProcessorContext. This
> would
> > >>  >> enable
> > >>  >>>>> the handler to potentially forward records, which imo should
> > >> not be
> > >>  >> done
> > >>  >>>>> from the handler since it could only ever call #forward but not
> > >> direct
> > >>  >>>>> where
> > >>  >>>>> the record is actually forwarded to, and could cause confusion
> if
> > >>  >> users
> > >>  >>>>> aren't aware that the handler is effectively calling from the
> > >> context
> > >>  >>>>> of the
> > >>  >>>>> processor that threw the exception.
> > >>  >>>>> 2a. If you don't explicitly want the ability to forward
> records, I
> > >>  >> would
> > >>  >>>>> suggest changing the type of this parameter to
> ProcessingContext,
> > >>  >> which
> > >>  >>>>> has all the metadata and useful info of the ProcessorContext
> but
> > >>  >> without
> > >>  >>>>> the
> > >>  >>>>> forwarding APIs. This would also lets us sidestep the following
> > >> issue:
> > >>  >>>>> 2b. If you *do* want the ability to forward records, setting
> aside
> > >>  >>>>> whether
> > >>  >>>>> that
> > >>  >>>>> in of itself makes sense to do, we would need to pass in
> either a
> > >>  >>> regular
> > >>  >>>>> ProcessorContext or a FixedKeyProcessorContext, depending on
> > >> what kind
> > >>  >>>>> of processor it is. I'm not quite sure how we could design a
> > >> clean API
> > >>  >>>>> here,
> > >>  >>>>> so I'll hold off until you clarify whether you even want
> > >> forwarding or
> > >>  >>>>> not.
> > >>  >>>>> We would also need to split the input record into a Record vs
> > >>  >>>>> FixedKeyRecord
> > >>  >>>>>
> > >>  >>>>> 3. One notable difference between this handler and the existing
> > >> ones
> > >>  >> you
> > >>  >>>>> pointed out, the Deserialization/ProductionExceptionHandler, is
> > >> that
> > >>  >> the
> > >>  >>>>> records passed in to those are in serialized bytes, whereas the
> > >> record
> > >>  >>>>> here would be POJOs. You account for this by making the
> parameter
> > >>  >>>>> type a Record<Object, Object>, but I just wonder how users
> > >> would be
> > >>  >>>>> able to read the key/value and figure out what type it should
> > >> be. For
> > >>  >>>>> example, would they need to maintain a map from processor name
> to
> > >>  >>>>> input record types?
> > >>  >>>>>
> > >>  >>>>> If you could provide an example of this new feature in the
> KIP, it
> > >>  >>>>> would be
> > >>  >>>>> very helpful in understanding whether we could do something to
> > >> make it
> > >>  >>>>> easier for users to use, for if it would be fine as-is
> > >>  >>>>>
> > >>  >>>>> 4. We should include all the relevant info for a new metric,
> > >> such as
> > >>  >> the
> > >>  >>>>> metric
> > >>  >>>>> group and recording level. You can look at other metrics KIPs
> like
> > >>  >>>>> KIP-444
> > >>  >>>>> and KIP-613 for an example. I suspect you intend for this to be
> > >> in the
> > >>  >>>>> processor group and at the INFO level?
> > >>  >>>>>
> > >>  >>>>> Hope that all makes sense! Thanks again for the KIP
> > >>  >>>>>
> > >>  >>>>> -Sophie
> > >>  >>>>>
> > >>  >>>>> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <
> > >>  >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> > >>  >>>>
> > >>  >>>>> wrote:
> > >>  >>>>>
> > >>  >>>>>> Hi everyone,
> > >>  >>>>>>
> > >>  >>>>>> After writing quite a few Kafka Streams applications, me and
> my
> > >>  >>>>>> colleagues
> > >>  >>>>>> just created KIP-1033 to introduce a new Exception Handler in
> > >> Kafka
> > >>  >>>>>> Streams
> > >>  >>>>>> to simplify error handling.
> > >>  >>>>>> This feature would allow defining an exception handler to
> > >>  >> automatically
> > >>  >>>>>> catch exceptions occurring during the processing of a message.
> > >>  >>>>>>
> > >>  >>>>>> KIP link:
> > >>  >>>>>>
> > >>  >>>>>>
> > >>  >>>
> > >>  >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> ><
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>
> > >>  >>> <
> > >>  >>>
> > >>  >>
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> ><
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>
> > >>  >>>>
> > >>  >>>>>>
> > >>  >>>>>> Feedbacks and suggestions are welcome,
> > >>  >>>>>>
> > >>  >>>>>> Cheers,
> > >>  >>>>>> Damien, Sebastien and Loic
> > >>  >>>>>>
> > >>  >>>>>
> > >>  >>>>> This email was screened for spam and malicious content but
> > >> exercise
> > >>  >>>>> caution anyway.
> > >>  >>>>>
> > >>  >>>>>
> > >>  >>>
> > >>  >>
>

Reply via email to