Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
  - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
  - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
  - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
   - Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman <sop...@responsive.dev> wrote:
>
> Fully agree about creating a new class for the bits of ProcessingContext
> that are specific to metadata only. In fact, more or less this same point
> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> can't always be trusted to remain immutable. Maybe it's possible to solve
> both issues at once, with the same class?
>
> On another related note -- I had actually also just proposed that we
> deprecate the existing DeserializationExceptionHandler method and replace
> it with one using the new PAPI as part of KIP-1034. But now that I'm
> reading this, I would say it probably makes more sense to do in this KIP.
> We can also push that out into a smaller-scoped third KIP if you want, but
> clearly, there is some overlap here and so however you guys (the authors)
> want to organize this part of the work is fine with me. I do think it
> should be done alongside/before this KIP and 1034 though, for all the
> reasons already stated.
>
> Everything else in the discussion so far I agree with! The
> ProcessingContext thing is the only open question in my mind
>
> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina <d.gaspar...@gmail.com>
> wrote:
>
> > Hi Matthias, Bruno,
> >
> > 1.a During my previous comment, by Processor Node ID, I meant
> > Processor name. This is important information to expose in the handler
> > as it allows users to identify the location of the exception in the
> > topology.
> > I assume this information could be useful in other places, that's why
> > I would lean toward adding this as an attribute in the
> > ProcessingContext.
> >
> > 1.b Looking at the ProcessingContext, I do think the following 3
> > methods should not be accessible in the exception handlers:
> > getStateStore(), schedule() and commit().
> > Having a separate interface would make a cleaner signature. It would
> > also be a great time to ensure that all exception handlers are
> > consistent, at the moment, the
> > DeserializationExceptionHandler.handle() relies on the old PAPI
> > ProcessorContext and the ProductionExceptionHandler.handle() has none.
> > It could make sense to build the new interface in this KIP and track
> > the effort to migrate the existing handlers in a separate KIP, what do
> > you think?
> > Maybe I am overthinking this part and the ProcessingContext would be fine.
> >
> > 4. Good point regarding the dropped-record metric, as it is used by
> > the other handlers, I do think it makes sense to leverage it instead
> > of creating a new metric.
> > I will update the KIP to update the dropped-record-metric.
> >
> > 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> > the gaps in a future KIP.
> >
> > Cheers,
> > Damien
> >
> >
> > On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna <cado...@apache.org> wrote:
> > >
> > > Hi Matthias,
> > >
> > >
> > > 1.a
> > > With processor node ID, I mean the ID that is exposed in the tags of
> > > processor node metrics. That ID cannot be internal since it is exposed
> > > in metrics. I think the processor name and the processor node ID is the
> > > same thing. I followed how the processor node ID is set in metrics and I
> > > ended up in addProcessor(name, ...).
> > >
> > >
> > > 1.b
> > > Regarding ProcessingContext, I also thought about a separate class to
> > > pass-in context information into the handler, but then I dismissed the
> > > idea because I thought I was overthinking it. Apparently, I was not
> > > overthinking it if you also had the same idea. So let's consider a
> > > separate class.
> > >
> > >
> > > 4.
> > > Regarding the metric, thanks for pointing to the dropped-record metric,
> > > Matthias. The dropped-record metric is used with the deserialization
> > > handler and the production handler. So, it would make sense to also use
> > > it for this handler. However, the dropped-record metric only records
> > > records that are skipped by the handler and not the number of calls to
> > > the handler. But that difference is probably irrelevant since in case of
> > > FAIL, the metric will be reset anyways since the stream thread will be
> > > restarted. In conclusion, I think the dropped-record metric in
> > > combination with a warn log message might be the better choice to
> > > introducing a new metric.
> > >
> > >
> > > 8.
> > > Regarding the DSL, I think we should close possible gaps in a separate
> > KIP.
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > > On 4/11/24 12:06 AM, Matthias J. Sax wrote:
> > > > Thanks for the KIP. Great discussion.
> > > >
> > > > I am not sure if I understand the proposal from Bruno to hand in the
> > > > processor node id? Isn't this internal (could not even find it
> > quickly).
> > > > We do have a processor name, right? Or do I mix up something?
> > > >
> > > > Another question is about `ProcessingContext` -- it contains a lot of
> > > > (potentially irrelevant?) metadata. We should think carefully about
> > what
> > > > we want to pass in and what not -- removing stuff is hard, but adding
> > > > stuff is easy. It's always an option to create a new interface that
> > only
> > > > exposes stuff we find useful, and allows us to evolve this interface
> > > > independent of others. Re-using an existing interface always has the
> > > > danger to introduce an undesired coupling that could bite us in the
> > > > future. -- It make total sense to pass in `RecordMetadata`, but
> > > > `ProcessingContext` (even if already limited compared to
> > > > `ProcessorContext`) still seems to be too broad? For example, there is
> > > > `getStateStore()` and `schedule()` methods which I think we should not
> > > > expose.
> > > >
> > > > The other interesting question is about "what record gets passed in".
> > > > For the PAPI, passing in the Processor's input record make a lot of
> > > > sense. However, for DSL operators, I am not 100% sure? The DSL often
> > > > uses internal types not exposed to the user, and thus I am not sure if
> > > > users could write useful code for this case? -- In general, I still
> > > > agree that the handler should be implement with a try-catch around
> > > > `Processor.process()` but it might not be too useful for DSL processor.
> > > > Hence, I am wondering if we need to so something more in the DSL? I
> > > > don't have a concrete proposal (a few high level ideas only) and if we
> > > > don't do anything special for the DSL I am ok with moving forward with
> > > > this KIP as-is, but we should be aware of potential limitations for DSL
> > > > users. We can always do a follow up KIP to close gaps when we
> > understand
> > > > the impact better -- covering the DSL would also expand the scope of
> > > > this KIP significantly...
> > > >
> > > > About the metric: just to double check. Do we think it's worth to add a
> > > > new metric? Or could we re-use the existing "dropped record metric"?
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 4/10/24 5:11 AM, Sebastien Viale wrote:
> > > >> Hi,
> > > >>
> > > >> You are right, it will simplify types.
> > > >>
> > > >> We update the KIP
> > > >>
> > > >> regards
> > > >>
> > > >> Sébastien *VIALE***
> > > >>
> > > >> *MICHELIN GROUP* - InfORMATION Technology
> > > >> *Technical Expert Kafka*
> > > >>
> > > >>   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> > > >>
> > > >>
> > ------------------------------------------------------------------------
> > > >> *De :* Bruno Cadonna <cado...@apache.org>
> > > >> *Envoyé :* mercredi 10 avril 2024 10:38
> > > >> *À :* dev@kafka.apache.org <dev@kafka.apache.org>
> > > >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> > > >> handler for exceptions occuring during processing
> > > >> Warning External sender Do not click on any links or open any
> > > >> attachments unless you trust the sender and know the content is safe.
> > > >>
> > > >> Hi Loïc, Damien, and Sébastien,
> > > >>
> > > >> Great that we are converging!
> > > >>
> > > >>
> > > >> 3.
> > > >> Damien and Loïc, I think in your examples the handler will receive
> > > >> Record<Object, Object> because an Record<Object, Object> is passed to
> > > >> the processor in the following code line:
> > > >>
> > https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> > <
> > https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> > >
> > > >>
> > > >> I see that we do not need to pass into the the handler a
> > Record<byte[],
> > > >> byte[]> just because we do that for the
> > DeserializationExceptionHandler
> > > >> and the ProductionExceptionHandler. When those two handlers are
> > called,
> > > >> the record is already serialized. This is not the case for the
> > > >> ProcessingExceptionHandler. However, I would propose to use Record<?,
> > ?>
> > > >> for the record that is passed to the ProcessingExceptionHandler
> > because
> > > >> it makes the handler API more flexible.
> > > >>
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >> This email was screened for spam and malicious content but exercise
> > > >> caution anyway.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On 4/9/24 9:09 PM, Loic Greffier wrote:
> > > >>  > Hi Bruno and Bill,
> > > >>  >
> > > >>  > To complete the Damien's purposes about the point 3.
> > > >>  >
> > > >>  > Processing errors are caught and handled by the
> > > >> ProcessingErrorHandler, at the precise moment when records are
> > > >> processed by processor nodes. The handling will be performed in the
> > > >> "process" method of the ProcessorNode, such as:
> > > >>  >
> > > >>  > public void process(final Record<KIn, VIn> record) {
> > > >>  > ...
> > > >>  >
> > > >>  > try {
> > > >>  > ...
> > > >>  > } catch (final ClassCastException e) {
> > > >>  > ...
> > > >>  > } catch (Exception e) {
> > > >>  > ProcessingExceptionHandler.ProcessingHandlerResponse response =
> > > >> this.processingExceptionHandler
> > > >>  > .handle(internalProcessorContext, (Record<Object, Object>) record,
> > e);
> > > >>  >
> > > >>  > if (response ==
> > > >> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
> > > >>  > throw new StreamsException("Processing exception handler is set to
> > > >> fail upon" +
> > > >>  > " a processing error. If you would rather have the streaming
> > > >> pipeline" +
> > > >>  > " continue after a processing error, please set the " +
> > > >>  > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "
> > appropriately.",
> > > >>  > e);
> > > >>  > }
> > > >>  > }
> > > >>  > }
> > > >>  > As you can see, the record is transmitted to the
> > > >> ProcessingExceptionHandler as a Record<Object,Object>, as we are
> > > >> dealing with the input record of the processor at this point. It can
> > > >> be any type, including non-serializable types, as suggested by the
> > > >> Damien's example. As the ProcessingErrorHandler is not intended to
> > > >> perform any serialization, there should be no issue for the users to
> > > >> handle a Record<Object,Object>.
> > > >>  >
> > > >>  > I follow Damien on the other points.
> > > >>  >
> > > >>  > For point 6, underlying public interfaces are renamed as well:
> > > >>  > - The ProcessingHandlerResponse
> > > >>  > - The
> > > >>
> > ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
> > > >>  > - The configuration
> > > >> DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG
> > > >> (default.processing.exception.handler)
> > > >>  >
> > > >>  > Regards,
> > > >>  >
> > > >>  > Loïc
> > > >>  >
> > > >>  > De : Damien Gasparina <d.gaspar...@gmail.com>
> > > >>  > Envoyé : mardi 9 avril 2024 20:08
> > > >>  > À : dev@kafka.apache.org
> > > >>  > Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
> > > >> exception handler for exceptions occuring during processing
> > > >>  >
> > > >>  > Warning External sender Do not click on any links or open any
> > > >> attachments unless you trust the sender and know the content is safe.
> > > >>  >
> > > >>  > Hi Bruno, Bill,
> > > >>  >
> > > >>  > First of all, thanks a lot for all your useful comments.
> > > >>  >
> > > >>  >> 1. and 2.
> > > >>  >> I am wondering whether we should expose the processor node ID --
> > > >> which
> > > >>  >> basically is the processor node name -- in the ProcessingContext
> > > >>  >> interface. I think the processor node ID fits well in the
> > > >>  >> ProcessingContext interface since it already contains application
> > > >> ID and
> > > >>  >> task ID and it would make the API for the handler cleaner.
> > > >>  >
> > > >>  > That's a good point, the actual ProcessorContextImpl is already
> > > >> holding the
> > > >>  > current node in an attribute (currentNode), thus exposing the node
> > > >> ID should
> > > >>  > not be a problem. Let me sleep on it and get back to you regarding
> > > >> this
> > > >>  > point.
> > > >>  >
> > > >>  >> 3.
> > > >>  >> Could you elaborate -- maybe with an example -- when a record is
> > in a
> > > >>  >> state in which it cannot be serialized? This is not completely
> > > >> clear to
> > > >>  > me.
> > > >>  >
> > > >>  > The Record passed to the handler is the input record to the
> > > >> processor. In
> > > >>  > the Kafka Streams API, it could be any POJO.
> > > >>  > e.g. with the following topology `
> > > >>  > streamsBuilder.stream("x")
> > > >>  > .map((k, v) -> new KeyValue("foo", Pair.of("hello",
> > > >>  > "world")))
> > > >>  > .forEach((k, v) -> throw new RuntimeException())
> > > >>  > I would expect the handler to receive a Record<String, Pair<String,
> > > >>  > String>>.
> > > >>  >
> > > >>  >> 4.
> > > >>  >> Regarding the metrics, it is not entirely clear to me what the
> > metric
> > > >>  >> measures. Is it the number of calls to the process handler or is
> > > >> it the
> > > >>  >> number of calls to process handler that returned FAIL?
> > > >>  >> If it is the former, I was also wondering whether it would be
> > > >> better to
> > > >>  >> put the task-level metrics to INFO reporting level and remove the
> > > >>  >> thread-level metric, similar to the dropped-records metric. You
> > can
> > > >>  >> always roll-up the metrics to the thread level in your preferred
> > > >>  >> monitoring system. Or do you think we end up with to many metrics?
> > > >>  >
> > > >>  > We were thinking of the former, measuring the number of calls to
> > the
> > > >>  > process handler. That's a good point, having the information at the
> > > >> task
> > > >>  > level could be beneficial. I updated the KIP to change the metric
> > > >> level
> > > >>  > and to clarify the wording.
> > > >>  >
> > > >>  >> 5.
> > > >>  >> What do you think about naming the handler
> > ProcessingExceptionHandler
> > > >>  >> instead of ProcessExceptionHandler?
> > > >>  >> The DeserializationExceptionHanlder and the
> > > >> ProductionExceptionHandler
> > > >>  >> also use the noun of the action in their name and not the verb.
> > > >>  >
> > > >>  > Good catch, I updated the KIP to rename it
> > ProcessingExceptionHandler.
> > > >>  >
> > > >>  >> 6.
> > > >>  >> What record is exactly passed to the handler?
> > > >>  >> Is it the input record to the task? Is it the input record to the
> > > >>  >> processor node? Is it the input record to the processor?
> > > >>  >
> > > >>  > The input record of the processor. I assume that is the most user
> > > >>  > friendly record in this context.
> > > >>  >
> > > >>  >> 7.
> > > >>  >> Could you please add the packages of the Java
> > > >> classes/interfaces/enums
> > > >>  >> you want to add?
> > > >>  >
> > > >>  > Done, without any surprises: package
> > org.apache.kafka.streams.errors;
> > > >>  >
> > > >>  >
> > > >>  > Thanks a lot for your reviews! Cheers,
> > > >>  > Damien
> > > >>  > This email was screened for spam and malicious content but exercise
> > > >> caution anyway.
> > > >>  >
> > > >>  >
> > > >>  >
> > > >>  >
> > > >>  > On Tue, 9 Apr 2024 at 18:04, Bill Bejeck
> > > >> <bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote:
> > > >>  >
> > > >>  >> Hi Damien, Sebastien and Loic,
> > > >>  >>
> > > >>  >> Thanks for the KIP, this is a much-needed addition.
> > > >>  >> I like the approach of getting the plumbing in for handling
> > processor
> > > >>  >> errors, allowing users to implement more complex solutions as
> > needed.
> > > >>  >>
> > > >>  >> Overall how where the KIP Is now LGTM, modulo outstanding
> > comments. I
> > > >>  >> think adding the example you included in this thread to the KIP is
> > > >> a great
> > > >>  >> idea.
> > > >>  >>
> > > >>  >> Regarding the metrics, I'm thinking along the same lines as Bruno.
> > > >> I'm
> > > >>  >> wondering if we can make do with a task-level metric at the INFO
> > > >> level and
> > > >>  >> the processor metric at DEBUG. IMHO, when it comes to tracking
> > > >> exceptions
> > > >>  >> in processing, these two areas are where users will want to focus,
> > > >> higher
> > > >>  >> level metrics wouldn't be as useful in this case.
> > > >>  >>
> > > >>  >> Thanks,
> > > >>  >> Bill
> > > >>  >>
> > > >>  >> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna
> > > >> <cado...@apache.org<mailto:cado...@apache.org>> wrote:
> > > >>  >>
> > > >>  >>> Hi again,
> > > >>  >>>
> > > >>  >>> I have additional questions/comments.
> > > >>  >>>
> > > >>  >>> 6.
> > > >>  >>> What record is exactly passed to the handler?
> > > >>  >>> Is it the input record to the task? Is it the input record to the
> > > >>  >>> processor node? Is it the input record to the processor?
> > > >>  >>>
> > > >>  >>>
> > > >>  >>> 7.
> > > >>  >>> Could you please add the packages of the Java
> > > >> classes/interfaces/enums
> > > >>  >>> you want to add?
> > > >>  >>>
> > > >>  >>>
> > > >>  >>> Best,
> > > >>  >>> Bruno
> > > >>  >>>
> > > >>  >>>
> > > >>  >>> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> > > >>  >>>> Hi Loïc, Damien, and Sébastien,
> > > >>  >>>>
> > > >>  >>>> Thanks for the KIP!
> > > >>  >>>> I find it really great that you contribute back to Kafka Streams
> > > >>  >>>> concepts you developed for kstreamplify so that everybody can
> > take
> > > >>  >>>> advantage from your improvements.
> > > >>  >>>>
> > > >>  >>>> I have a couple of questions/comments:
> > > >>  >>>>
> > > >>  >>>> 1. and 2.
> > > >>  >>>> I am wondering whether we should expose the processor node ID --
> > > >> which
> > > >>  >>>> basically is the processor node name -- in the ProcessingContext
> > > >>  >>>> interface. I think the processor node ID fits well in the
> > > >>  >>>> ProcessingContext interface since it already contains
> > > >> application ID
> > > >>  >> and
> > > >>  >>>> task ID and it would make the API for the handler cleaner.
> > > >>  >>>>
> > > >>  >>>>
> > > >>  >>>> 3.
> > > >>  >>>> Could you elaborate -- maybe with an example -- when a record is
> > > >> in a
> > > >>  >>>> state in which it cannot be serialized? This is not completely
> > > >> clear to
> > > >>  >>> me.
> > > >>  >>>>
> > > >>  >>>>
> > > >>  >>>> 4.
> > > >>  >>>> Regarding the metrics, it is not entirely clear to me what the
> > > >> metric
> > > >>  >>>> measures. Is it the number of calls to the process handler or is
> > > >> it the
> > > >>  >>>> number of calls to process handler that returned FAIL?
> > > >>  >>>> If it is the former, I was also wondering whether it would be
> > > >> better to
> > > >>  >>>> put the task-level metrics to INFO reporting level and remove
> > the
> > > >>  >>>> thread-level metric, similar to the dropped-records metric. You
> > can
> > > >>  >>>> always roll-up the metrics to the thread level in your preferred
> > > >>  >>>> monitoring system. Or do you think we end up with to many
> > metrics?
> > > >>  >>>>
> > > >>  >>>>
> > > >>  >>>> 5.
> > > >>  >>>> What do you think about naming the handler
> > > >> ProcessingExceptionHandler
> > > >>  >>>> instead of ProcessExceptionHandler?
> > > >>  >>>> The DeserializationExceptionHanlder and the
> > > >> ProductionExceptionHandler
> > > >>  >>>> also use the noun of the action in their name and not the verb.
> > > >>  >>>>
> > > >>  >>>>
> > > >>  >>>> Best,
> > > >>  >>>> Bruno
> > > >>  >>>>
> > > >>  >>>>
> > > >>  >>>> On 4/8/24 3:48 PM, Sebastien Viale wrote:
> > > >>  >>>>> Thanks for your review!
> > > >>  >>>>>
> > > >>  >>>>> All the points make sense for us!
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>> We updated the KIP for points 1 and 4.
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>> 2/ We followed the DeserializationExceptionHandler interface
> > > >>  >>>>> signature, it was not on our mind that the record be forwarded
> > > >> with
> > > >>  >>>>> the ProcessorContext.
> > > >>  >>>>>
> > > >>  >>>>> The ProcessingContext is sufficient, we do expect that most
> > people
> > > >>  >>>>> would need to access the RecordMetadata.
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>> 3/ The use of Record<Object, Object> is required, as the error
> > > >> could
> > > >>  >>>>> occurred in the middle of a processor where records could be
> > non
> > > >>  >>>>> serializable objects
> > > >>  >>>>>
> > > >>  >>>>> As it is a global error catching, the user may need little
> > > >>  >>>>> information about the faulty record.
> > > >>  >>>>>
> > > >>  >>>>> Assuming that users want to make some specific treatments to
> > the
> > > >>  >>>>> record, they can add a try / catch block in the topology.
> > > >>  >>>>>
> > > >>  >>>>> It is up to users to cast record value and key in the
> > > >> implementation
> > > >>  >>>>> of the ProcessorExceptionHandler.
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>>> Cheers
> > > >>  >>>>>
> > > >>  >>>>> Loïc, Damien and Sébastien
> > > >>  >>>>>
> > > >>  >>>>> ________________________________
> > > >>  >>>>> De : Sophie Blee-Goldman
> > > >> <sop...@responsive.dev<mailto:sop...@responsive.dev>>
> > > >>  >>>>> Envoyé : samedi 6 avril 2024 01:08
> > > >>  >>>>> À : dev@kafka.apache.org<mailto:dev@kafka.apache.org>
> > > >> <dev@kafka.apache.org<mailto:dev@kafka.apache.org>>
> > > >>  >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
> > exception
> > > >>  >>>>> handler for exceptions occuring during processing
> > > >>  >>>>>
> > > >>  >>>>> Warning External sender Do not click on any links or open any
> > > >>  >>>>> attachments unless you trust the sender and know the content is
> > > >> safe.
> > > >>  >>>>>
> > > >>  >>>>> Hi Damien,
> > > >>  >>>>>
> > > >>  >>>>> First off thanks for the KIP, this is definitely a much needed
> > > >>  >>>>> feature. On
> > > >>  >>>>> the
> > > >>  >>>>> whole it seems pretty straightforward and I am in favor of the
> > > >>  >> proposal.
> > > >>  >>>>> Just
> > > >>  >>>>> a few questions and suggestions here and there:
> > > >>  >>>>>
> > > >>  >>>>> 1. One of the #handle method's parameters is "ProcessorNode
> > > >> node", but
> > > >>  >>>>> ProcessorNode is an internal class (and would expose a lot of
> > > >>  >> internals
> > > >>  >>>>> that we probably don't want to pass in to an exception
> > > >> handler). Would
> > > >>  >>> it
> > > >>  >>>>> be sufficient to just make this a String and pass in the
> > processor
> > > >>  >> name?
> > > >>  >>>>>
> > > >>  >>>>> 2. Another of the parameters in the ProcessorContext. This
> > would
> > > >>  >> enable
> > > >>  >>>>> the handler to potentially forward records, which imo should
> > > >> not be
> > > >>  >> done
> > > >>  >>>>> from the handler since it could only ever call #forward but not
> > > >> direct
> > > >>  >>>>> where
> > > >>  >>>>> the record is actually forwarded to, and could cause confusion
> > if
> > > >>  >> users
> > > >>  >>>>> aren't aware that the handler is effectively calling from the
> > > >> context
> > > >>  >>>>> of the
> > > >>  >>>>> processor that threw the exception.
> > > >>  >>>>> 2a. If you don't explicitly want the ability to forward
> > records, I
> > > >>  >> would
> > > >>  >>>>> suggest changing the type of this parameter to
> > ProcessingContext,
> > > >>  >> which
> > > >>  >>>>> has all the metadata and useful info of the ProcessorContext
> > but
> > > >>  >> without
> > > >>  >>>>> the
> > > >>  >>>>> forwarding APIs. This would also lets us sidestep the following
> > > >> issue:
> > > >>  >>>>> 2b. If you *do* want the ability to forward records, setting
> > aside
> > > >>  >>>>> whether
> > > >>  >>>>> that
> > > >>  >>>>> in of itself makes sense to do, we would need to pass in
> > either a
> > > >>  >>> regular
> > > >>  >>>>> ProcessorContext or a FixedKeyProcessorContext, depending on
> > > >> what kind
> > > >>  >>>>> of processor it is. I'm not quite sure how we could design a
> > > >> clean API
> > > >>  >>>>> here,
> > > >>  >>>>> so I'll hold off until you clarify whether you even want
> > > >> forwarding or
> > > >>  >>>>> not.
> > > >>  >>>>> We would also need to split the input record into a Record vs
> > > >>  >>>>> FixedKeyRecord
> > > >>  >>>>>
> > > >>  >>>>> 3. One notable difference between this handler and the existing
> > > >> ones
> > > >>  >> you
> > > >>  >>>>> pointed out, the Deserialization/ProductionExceptionHandler, is
> > > >> that
> > > >>  >> the
> > > >>  >>>>> records passed in to those are in serialized bytes, whereas the
> > > >> record
> > > >>  >>>>> here would be POJOs. You account for this by making the
> > parameter
> > > >>  >>>>> type a Record<Object, Object>, but I just wonder how users
> > > >> would be
> > > >>  >>>>> able to read the key/value and figure out what type it should
> > > >> be. For
> > > >>  >>>>> example, would they need to maintain a map from processor name
> > to
> > > >>  >>>>> input record types?
> > > >>  >>>>>
> > > >>  >>>>> If you could provide an example of this new feature in the
> > KIP, it
> > > >>  >>>>> would be
> > > >>  >>>>> very helpful in understanding whether we could do something to
> > > >> make it
> > > >>  >>>>> easier for users to use, for if it would be fine as-is
> > > >>  >>>>>
> > > >>  >>>>> 4. We should include all the relevant info for a new metric,
> > > >> such as
> > > >>  >> the
> > > >>  >>>>> metric
> > > >>  >>>>> group and recording level. You can look at other metrics KIPs
> > like
> > > >>  >>>>> KIP-444
> > > >>  >>>>> and KIP-613 for an example. I suspect you intend for this to be
> > > >> in the
> > > >>  >>>>> processor group and at the INFO level?
> > > >>  >>>>>
> > > >>  >>>>> Hope that all makes sense! Thanks again for the KIP
> > > >>  >>>>>
> > > >>  >>>>> -Sophie
> > > >>  >>>>>
> > > >>  >>>>> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <
> > > >>  >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> > > >>  >>>>
> > > >>  >>>>> wrote:
> > > >>  >>>>>
> > > >>  >>>>>> Hi everyone,
> > > >>  >>>>>>
> > > >>  >>>>>> After writing quite a few Kafka Streams applications, me and
> > my
> > > >>  >>>>>> colleagues
> > > >>  >>>>>> just created KIP-1033 to introduce a new Exception Handler in
> > > >> Kafka
> > > >>  >>>>>> Streams
> > > >>  >>>>>> to simplify error handling.
> > > >>  >>>>>> This feature would allow defining an exception handler to
> > > >>  >> automatically
> > > >>  >>>>>> catch exceptions occurring during the processing of a message.
> > > >>  >>>>>>
> > > >>  >>>>>> KIP link:
> > > >>  >>>>>>
> > > >>  >>>>>>
> > > >>  >>>
> > > >>  >>
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > ><
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > >>
> > > >>  >>> <
> > > >>  >>>
> > > >>  >>
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > ><
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > >>
> > > >>  >>>>
> > > >>  >>>>>>
> > > >>  >>>>>> Feedbacks and suggestions are welcome,
> > > >>  >>>>>>
> > > >>  >>>>>> Cheers,
> > > >>  >>>>>> Damien, Sebastien and Loic
> > > >>  >>>>>>
> > > >>  >>>>>
> > > >>  >>>>> This email was screened for spam and malicious content but
> > > >> exercise
> > > >>  >>>>> caution anyway.
> > > >>  >>>>>
> > > >>  >>>>>
> > > >>  >>>
> > > >>  >>
> >

Reply via email to