Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext

> Is there any reason you did not use something like
> Record<byte[], byte[]> sourceRecord()

2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

>From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default implementation to ensure
backward compatibility while encouraging people to implement the new
method signature. If you know a better approach, I'll take :-)

Cheers,
Damien

On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna <cado...@apache.org> wrote:
>
> Hi Damien,
>
> Thanks a lot for the updates!
>
> I have the following comments:
>
> (1)
> Could you rename ProcessingMetadata to ErrorHandlerContext or
> ErrorHandlerMetadata (I am preferring the former)? I think it makes it
> clearer for what this context/metadata is for.
>
>
> (2)
> Is there any reason you did not use something like
>
> Record<byte[], byte[]> sourceRecord()
>
> in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and
> headers()? The headers() method refers to the record read from the input
> topic of the sub-topology, right? If yes, maybe that is also something
> to mention more explicitly.
>
>
> (3)
> Since you added the processor node ID to the ProcessingMetadata, you can
> remove it from the signature of method handle() in
> ProcessingExceptionHandler.
>
>
> (4)
> Where are the mentioned changes to the DeserializationExceptionHandler?
>
>
> (5)
> To be consistent, the order of the parameters in the
> ProductionExceptionHandler should be:
> 1. context
> 2. record
> 3. exception
>
>
> (6)
> I am wondering where the implementation of ProcessingMetadata gets the
> sourceRawKey/Value from. Do we need additional changes in
> ProcessingContext and implementations?
>
>
> Best,
> Bruno
>
>
> On 4/21/24 2:23 PM, Damien Gasparina wrote:
> > Hi Everyone,
> >
> > Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
> >    - We introduced a new ProcessingMetadata class containing only the
> > ProcessorContext metadata: topic, partition, offset, headers[],
> > sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
> >    - To be consistent, we propose to deprecate the existing
> > DeserializationExceptionHandler and ProductionExceptionHandler methods
> > to rely on the new ProcessingMetadata
> >    - The creation and the ProcessingMetadata and the deprecation of old
> > methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
> > Dead Letter Queue implementation without touching any interfaces. We
> > introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
> > it's the wisest implementation wise.
> >     - Instead of creating a new metric, KIP-1033 updates the
> > dropped-record metric.
> >
> > Let me know what you think, if everything's fine, I think we should be
> > good to start a VOTE?
> >
> > Cheers,
> > Damien
> >
> >
> >
> >
> >
> > On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman <sop...@responsive.dev> 
> > wrote:
> >>
> >> Fully agree about creating a new class for the bits of ProcessingContext
> >> that are specific to metadata only. In fact, more or less this same point
> >> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> >> can't always be trusted to remain immutable. Maybe it's possible to solve
> >> both issues at once, with the same class?
> >>
> >> On another related note -- I had actually also just proposed that we
> >> deprecate the existing DeserializationExceptionHandler method and replace
> >> it with one using the new PAPI as part of KIP-1034. But now that I'm
> >> reading this, I would say it probably makes more sense to do in this KIP.
> >> We can also push that out into a smaller-scoped third KIP if you want, but
> >> clearly, there is some overlap here and so however you guys (the authors)
> >> want to organize this part of the work is fine with me. I do think it
> >> should be done alongside/before this KIP and 1034 though, for all the
> >> reasons already stated.
> >>
> >> Everything else in the discussion so far I agree with! The
> >> ProcessingContext thing is the only open question in my mind
> >>
> >> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina <d.gaspar...@gmail.com>
> >> wrote:
> >>
> >>> Hi Matthias, Bruno,
> >>>
> >>> 1.a During my previous comment, by Processor Node ID, I meant
> >>> Processor name. This is important information to expose in the handler
> >>> as it allows users to identify the location of the exception in the
> >>> topology.
> >>> I assume this information could be useful in other places, that's why
> >>> I would lean toward adding this as an attribute in the
> >>> ProcessingContext.
> >>>
> >>> 1.b Looking at the ProcessingContext, I do think the following 3
> >>> methods should not be accessible in the exception handlers:
> >>> getStateStore(), schedule() and commit().
> >>> Having a separate interface would make a cleaner signature. It would
> >>> also be a great time to ensure that all exception handlers are
> >>> consistent, at the moment, the
> >>> DeserializationExceptionHandler.handle() relies on the old PAPI
> >>> ProcessorContext and the ProductionExceptionHandler.handle() has none.
> >>> It could make sense to build the new interface in this KIP and track
> >>> the effort to migrate the existing handlers in a separate KIP, what do
> >>> you think?
> >>> Maybe I am overthinking this part and the ProcessingContext would be fine.
> >>>
> >>> 4. Good point regarding the dropped-record metric, as it is used by
> >>> the other handlers, I do think it makes sense to leverage it instead
> >>> of creating a new metric.
> >>> I will update the KIP to update the dropped-record-metric.
> >>>
> >>> 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> >>> the gaps in a future KIP.
> >>>
> >>> Cheers,
> >>> Damien
> >>>
> >>>
> >>> On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna <cado...@apache.org> wrote:
> >>>>
> >>>> Hi Matthias,
> >>>>
> >>>>
> >>>> 1.a
> >>>> With processor node ID, I mean the ID that is exposed in the tags of
> >>>> processor node metrics. That ID cannot be internal since it is exposed
> >>>> in metrics. I think the processor name and the processor node ID is the
> >>>> same thing. I followed how the processor node ID is set in metrics and I
> >>>> ended up in addProcessor(name, ...).
> >>>>
> >>>>
> >>>> 1.b
> >>>> Regarding ProcessingContext, I also thought about a separate class to
> >>>> pass-in context information into the handler, but then I dismissed the
> >>>> idea because I thought I was overthinking it. Apparently, I was not
> >>>> overthinking it if you also had the same idea. So let's consider a
> >>>> separate class.
> >>>>
> >>>>
> >>>> 4.
> >>>> Regarding the metric, thanks for pointing to the dropped-record metric,
> >>>> Matthias. The dropped-record metric is used with the deserialization
> >>>> handler and the production handler. So, it would make sense to also use
> >>>> it for this handler. However, the dropped-record metric only records
> >>>> records that are skipped by the handler and not the number of calls to
> >>>> the handler. But that difference is probably irrelevant since in case of
> >>>> FAIL, the metric will be reset anyways since the stream thread will be
> >>>> restarted. In conclusion, I think the dropped-record metric in
> >>>> combination with a warn log message might be the better choice to
> >>>> introducing a new metric.
> >>>>
> >>>>
> >>>> 8.
> >>>> Regarding the DSL, I think we should close possible gaps in a separate
> >>> KIP.
> >>>>
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 4/11/24 12:06 AM, Matthias J. Sax wrote:
> >>>>> Thanks for the KIP. Great discussion.
> >>>>>
> >>>>> I am not sure if I understand the proposal from Bruno to hand in the
> >>>>> processor node id? Isn't this internal (could not even find it
> >>> quickly).
> >>>>> We do have a processor name, right? Or do I mix up something?
> >>>>>
> >>>>> Another question is about `ProcessingContext` -- it contains a lot of
> >>>>> (potentially irrelevant?) metadata. We should think carefully about
> >>> what
> >>>>> we want to pass in and what not -- removing stuff is hard, but adding
> >>>>> stuff is easy. It's always an option to create a new interface that
> >>> only
> >>>>> exposes stuff we find useful, and allows us to evolve this interface
> >>>>> independent of others. Re-using an existing interface always has the
> >>>>> danger to introduce an undesired coupling that could bite us in the
> >>>>> future. -- It make total sense to pass in `RecordMetadata`, but
> >>>>> `ProcessingContext` (even if already limited compared to
> >>>>> `ProcessorContext`) still seems to be too broad? For example, there is
> >>>>> `getStateStore()` and `schedule()` methods which I think we should not
> >>>>> expose.
> >>>>>
> >>>>> The other interesting question is about "what record gets passed in".
> >>>>> For the PAPI, passing in the Processor's input record make a lot of
> >>>>> sense. However, for DSL operators, I am not 100% sure? The DSL often
> >>>>> uses internal types not exposed to the user, and thus I am not sure if
> >>>>> users could write useful code for this case? -- In general, I still
> >>>>> agree that the handler should be implement with a try-catch around
> >>>>> `Processor.process()` but it might not be too useful for DSL processor.
> >>>>> Hence, I am wondering if we need to so something more in the DSL? I
> >>>>> don't have a concrete proposal (a few high level ideas only) and if we
> >>>>> don't do anything special for the DSL I am ok with moving forward with
> >>>>> this KIP as-is, but we should be aware of potential limitations for DSL
> >>>>> users. We can always do a follow up KIP to close gaps when we
> >>> understand
> >>>>> the impact better -- covering the DSL would also expand the scope of
> >>>>> this KIP significantly...
> >>>>>
> >>>>> About the metric: just to double check. Do we think it's worth to add a
> >>>>> new metric? Or could we re-use the existing "dropped record metric"?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 4/10/24 5:11 AM, Sebastien Viale wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> You are right, it will simplify types.
> >>>>>>
> >>>>>> We update the KIP
> >>>>>>
> >>>>>> regards
> >>>>>>
> >>>>>> Sébastien *VIALE***
> >>>>>>
> >>>>>> *MICHELIN GROUP* - InfORMATION Technology
> >>>>>> *Technical Expert Kafka*
> >>>>>>
> >>>>>>    Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> >>>>>>
> >>>>>>
> >>> ------------------------------------------------------------------------
> >>>>>> *De :* Bruno Cadonna <cado...@apache.org>
> >>>>>> *Envoyé :* mercredi 10 avril 2024 10:38
> >>>>>> *À :* dev@kafka.apache.org <dev@kafka.apache.org>
> >>>>>> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >>>>>> handler for exceptions occuring during processing
> >>>>>> Warning External sender Do not click on any links or open any
> >>>>>> attachments unless you trust the sender and know the content is safe.
> >>>>>>
> >>>>>> Hi Loïc, Damien, and Sébastien,
> >>>>>>
> >>>>>> Great that we are converging!
> >>>>>>
> >>>>>>
> >>>>>> 3.
> >>>>>> Damien and Loïc, I think in your examples the handler will receive
> >>>>>> Record<Object, Object> because an Record<Object, Object> is passed to
> >>>>>> the processor in the following code line:
> >>>>>>
> >>> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> >>> <
> >>> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> >>>>
> >>>>>>
> >>>>>> I see that we do not need to pass into the the handler a
> >>> Record<byte[],
> >>>>>> byte[]> just because we do that for the
> >>> DeserializationExceptionHandler
> >>>>>> and the ProductionExceptionHandler. When those two handlers are
> >>> called,
> >>>>>> the record is already serialized. This is not the case for the
> >>>>>> ProcessingExceptionHandler. However, I would propose to use Record<?,
> >>> ?>
> >>>>>> for the record that is passed to the ProcessingExceptionHandler
> >>> because
> >>>>>> it makes the handler API more flexible.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> This email was screened for spam and malicious content but exercise
> >>>>>> caution anyway.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 4/9/24 9:09 PM, Loic Greffier wrote:
> >>>>>>   > Hi Bruno and Bill,
> >>>>>>   >
> >>>>>>   > To complete the Damien's purposes about the point 3.
> >>>>>>   >
> >>>>>>   > Processing errors are caught and handled by the
> >>>>>> ProcessingErrorHandler, at the precise moment when records are
> >>>>>> processed by processor nodes. The handling will be performed in the
> >>>>>> "process" method of the ProcessorNode, such as:
> >>>>>>   >
> >>>>>>   > public void process(final Record<KIn, VIn> record) {
> >>>>>>   > ...
> >>>>>>   >
> >>>>>>   > try {
> >>>>>>   > ...
> >>>>>>   > } catch (final ClassCastException e) {
> >>>>>>   > ...
> >>>>>>   > } catch (Exception e) {
> >>>>>>   > ProcessingExceptionHandler.ProcessingHandlerResponse response =
> >>>>>> this.processingExceptionHandler
> >>>>>>   > .handle(internalProcessorContext, (Record<Object, Object>) record,
> >>> e);
> >>>>>>   >
> >>>>>>   > if (response ==
> >>>>>> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
> >>>>>>   > throw new StreamsException("Processing exception handler is set to
> >>>>>> fail upon" +
> >>>>>>   > " a processing error. If you would rather have the streaming
> >>>>>> pipeline" +
> >>>>>>   > " continue after a processing error, please set the " +
> >>>>>>   > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "
> >>> appropriately.",
> >>>>>>   > e);
> >>>>>>   > }
> >>>>>>   > }
> >>>>>>   > }
> >>>>>>   > As you can see, the record is transmitted to the
> >>>>>> ProcessingExceptionHandler as a Record<Object,Object>, as we are
> >>>>>> dealing with the input record of the processor at this point. It can
> >>>>>> be any type, including non-serializable types, as suggested by the
> >>>>>> Damien's example. As the ProcessingErrorHandler is not intended to
> >>>>>> perform any serialization, there should be no issue for the users to
> >>>>>> handle a Record<Object,Object>.
> >>>>>>   >
> >>>>>>   > I follow Damien on the other points.
> >>>>>>   >
> >>>>>>   > For point 6, underlying public interfaces are renamed as well:
> >>>>>>   > - The ProcessingHandlerResponse
> >>>>>>   > - The
> >>>>>>
> >>> ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
> >>>>>>   > - The configuration
> >>>>>> DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG
> >>>>>> (default.processing.exception.handler)
> >>>>>>   >
> >>>>>>   > Regards,
> >>>>>>   >
> >>>>>>   > Loïc
> >>>>>>   >
> >>>>>>   > De : Damien Gasparina <d.gaspar...@gmail.com>
> >>>>>>   > Envoyé : mardi 9 avril 2024 20:08
> >>>>>>   > À : dev@kafka.apache.org
> >>>>>>   > Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
> >>>>>> exception handler for exceptions occuring during processing
> >>>>>>   >
> >>>>>>   > Warning External sender Do not click on any links or open any
> >>>>>> attachments unless you trust the sender and know the content is safe.
> >>>>>>   >
> >>>>>>   > Hi Bruno, Bill,
> >>>>>>   >
> >>>>>>   > First of all, thanks a lot for all your useful comments.
> >>>>>>   >
> >>>>>>   >> 1. and 2.
> >>>>>>   >> I am wondering whether we should expose the processor node ID --
> >>>>>> which
> >>>>>>   >> basically is the processor node name -- in the ProcessingContext
> >>>>>>   >> interface. I think the processor node ID fits well in the
> >>>>>>   >> ProcessingContext interface since it already contains application
> >>>>>> ID and
> >>>>>>   >> task ID and it would make the API for the handler cleaner.
> >>>>>>   >
> >>>>>>   > That's a good point, the actual ProcessorContextImpl is already
> >>>>>> holding the
> >>>>>>   > current node in an attribute (currentNode), thus exposing the node
> >>>>>> ID should
> >>>>>>   > not be a problem. Let me sleep on it and get back to you regarding
> >>>>>> this
> >>>>>>   > point.
> >>>>>>   >
> >>>>>>   >> 3.
> >>>>>>   >> Could you elaborate -- maybe with an example -- when a record is
> >>> in a
> >>>>>>   >> state in which it cannot be serialized? This is not completely
> >>>>>> clear to
> >>>>>>   > me.
> >>>>>>   >
> >>>>>>   > The Record passed to the handler is the input record to the
> >>>>>> processor. In
> >>>>>>   > the Kafka Streams API, it could be any POJO.
> >>>>>>   > e.g. with the following topology `
> >>>>>>   > streamsBuilder.stream("x")
> >>>>>>   > .map((k, v) -> new KeyValue("foo", Pair.of("hello",
> >>>>>>   > "world")))
> >>>>>>   > .forEach((k, v) -> throw new RuntimeException())
> >>>>>>   > I would expect the handler to receive a Record<String, Pair<String,
> >>>>>>   > String>>.
> >>>>>>   >
> >>>>>>   >> 4.
> >>>>>>   >> Regarding the metrics, it is not entirely clear to me what the
> >>> metric
> >>>>>>   >> measures. Is it the number of calls to the process handler or is
> >>>>>> it the
> >>>>>>   >> number of calls to process handler that returned FAIL?
> >>>>>>   >> If it is the former, I was also wondering whether it would be
> >>>>>> better to
> >>>>>>   >> put the task-level metrics to INFO reporting level and remove the
> >>>>>>   >> thread-level metric, similar to the dropped-records metric. You
> >>> can
> >>>>>>   >> always roll-up the metrics to the thread level in your preferred
> >>>>>>   >> monitoring system. Or do you think we end up with to many metrics?
> >>>>>>   >
> >>>>>>   > We were thinking of the former, measuring the number of calls to
> >>> the
> >>>>>>   > process handler. That's a good point, having the information at the
> >>>>>> task
> >>>>>>   > level could be beneficial. I updated the KIP to change the metric
> >>>>>> level
> >>>>>>   > and to clarify the wording.
> >>>>>>   >
> >>>>>>   >> 5.
> >>>>>>   >> What do you think about naming the handler
> >>> ProcessingExceptionHandler
> >>>>>>   >> instead of ProcessExceptionHandler?
> >>>>>>   >> The DeserializationExceptionHanlder and the
> >>>>>> ProductionExceptionHandler
> >>>>>>   >> also use the noun of the action in their name and not the verb.
> >>>>>>   >
> >>>>>>   > Good catch, I updated the KIP to rename it
> >>> ProcessingExceptionHandler.
> >>>>>>   >
> >>>>>>   >> 6.
> >>>>>>   >> What record is exactly passed to the handler?
> >>>>>>   >> Is it the input record to the task? Is it the input record to the
> >>>>>>   >> processor node? Is it the input record to the processor?
> >>>>>>   >
> >>>>>>   > The input record of the processor. I assume that is the most user
> >>>>>>   > friendly record in this context.
> >>>>>>   >
> >>>>>>   >> 7.
> >>>>>>   >> Could you please add the packages of the Java
> >>>>>> classes/interfaces/enums
> >>>>>>   >> you want to add?
> >>>>>>   >
> >>>>>>   > Done, without any surprises: package
> >>> org.apache.kafka.streams.errors;
> >>>>>>   >
> >>>>>>   >
> >>>>>>   > Thanks a lot for your reviews! Cheers,
> >>>>>>   > Damien
> >>>>>>   > This email was screened for spam and malicious content but exercise
> >>>>>> caution anyway.
> >>>>>>   >
> >>>>>>   >
> >>>>>>   >
> >>>>>>   >
> >>>>>>   > On Tue, 9 Apr 2024 at 18:04, Bill Bejeck
> >>>>>> <bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote:
> >>>>>>   >
> >>>>>>   >> Hi Damien, Sebastien and Loic,
> >>>>>>   >>
> >>>>>>   >> Thanks for the KIP, this is a much-needed addition.
> >>>>>>   >> I like the approach of getting the plumbing in for handling
> >>> processor
> >>>>>>   >> errors, allowing users to implement more complex solutions as
> >>> needed.
> >>>>>>   >>
> >>>>>>   >> Overall how where the KIP Is now LGTM, modulo outstanding
> >>> comments. I
> >>>>>>   >> think adding the example you included in this thread to the KIP is
> >>>>>> a great
> >>>>>>   >> idea.
> >>>>>>   >>
> >>>>>>   >> Regarding the metrics, I'm thinking along the same lines as Bruno.
> >>>>>> I'm
> >>>>>>   >> wondering if we can make do with a task-level metric at the INFO
> >>>>>> level and
> >>>>>>   >> the processor metric at DEBUG. IMHO, when it comes to tracking
> >>>>>> exceptions
> >>>>>>   >> in processing, these two areas are where users will want to focus,
> >>>>>> higher
> >>>>>>   >> level metrics wouldn't be as useful in this case.
> >>>>>>   >>
> >>>>>>   >> Thanks,
> >>>>>>   >> Bill
> >>>>>>   >>
> >>>>>>   >> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna
> >>>>>> <cado...@apache.org<mailto:cado...@apache.org>> wrote:
> >>>>>>   >>
> >>>>>>   >>> Hi again,
> >>>>>>   >>>
> >>>>>>   >>> I have additional questions/comments.
> >>>>>>   >>>
> >>>>>>   >>> 6.
> >>>>>>   >>> What record is exactly passed to the handler?
> >>>>>>   >>> Is it the input record to the task? Is it the input record to the
> >>>>>>   >>> processor node? Is it the input record to the processor?
> >>>>>>   >>>
> >>>>>>   >>>
> >>>>>>   >>> 7.
> >>>>>>   >>> Could you please add the packages of the Java
> >>>>>> classes/interfaces/enums
> >>>>>>   >>> you want to add?
> >>>>>>   >>>
> >>>>>>   >>>
> >>>>>>   >>> Best,
> >>>>>>   >>> Bruno
> >>>>>>   >>>
> >>>>>>   >>>
> >>>>>>   >>> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> >>>>>>   >>>> Hi Loïc, Damien, and Sébastien,
> >>>>>>   >>>>
> >>>>>>   >>>> Thanks for the KIP!
> >>>>>>   >>>> I find it really great that you contribute back to Kafka Streams
> >>>>>>   >>>> concepts you developed for kstreamplify so that everybody can
> >>> take
> >>>>>>   >>>> advantage from your improvements.
> >>>>>>   >>>>
> >>>>>>   >>>> I have a couple of questions/comments:
> >>>>>>   >>>>
> >>>>>>   >>>> 1. and 2.
> >>>>>>   >>>> I am wondering whether we should expose the processor node ID --
> >>>>>> which
> >>>>>>   >>>> basically is the processor node name -- in the ProcessingContext
> >>>>>>   >>>> interface. I think the processor node ID fits well in the
> >>>>>>   >>>> ProcessingContext interface since it already contains
> >>>>>> application ID
> >>>>>>   >> and
> >>>>>>   >>>> task ID and it would make the API for the handler cleaner.
> >>>>>>   >>>>
> >>>>>>   >>>>
> >>>>>>   >>>> 3.
> >>>>>>   >>>> Could you elaborate -- maybe with an example -- when a record is
> >>>>>> in a
> >>>>>>   >>>> state in which it cannot be serialized? This is not completely
> >>>>>> clear to
> >>>>>>   >>> me.
> >>>>>>   >>>>
> >>>>>>   >>>>
> >>>>>>   >>>> 4.
> >>>>>>   >>>> Regarding the metrics, it is not entirely clear to me what the
> >>>>>> metric
> >>>>>>   >>>> measures. Is it the number of calls to the process handler or is
> >>>>>> it the
> >>>>>>   >>>> number of calls to process handler that returned FAIL?
> >>>>>>   >>>> If it is the former, I was also wondering whether it would be
> >>>>>> better to
> >>>>>>   >>>> put the task-level metrics to INFO reporting level and remove
> >>> the
> >>>>>>   >>>> thread-level metric, similar to the dropped-records metric. You
> >>> can
> >>>>>>   >>>> always roll-up the metrics to the thread level in your preferred
> >>>>>>   >>>> monitoring system. Or do you think we end up with to many
> >>> metrics?
> >>>>>>   >>>>
> >>>>>>   >>>>
> >>>>>>   >>>> 5.
> >>>>>>   >>>> What do you think about naming the handler
> >>>>>> ProcessingExceptionHandler
> >>>>>>   >>>> instead of ProcessExceptionHandler?
> >>>>>>   >>>> The DeserializationExceptionHanlder and the
> >>>>>> ProductionExceptionHandler
> >>>>>>   >>>> also use the noun of the action in their name and not the verb.
> >>>>>>   >>>>
> >>>>>>   >>>>
> >>>>>>   >>>> Best,
> >>>>>>   >>>> Bruno
> >>>>>>   >>>>
> >>>>>>   >>>>
> >>>>>>   >>>> On 4/8/24 3:48 PM, Sebastien Viale wrote:
> >>>>>>   >>>>> Thanks for your review!
> >>>>>>   >>>>>
> >>>>>>   >>>>> All the points make sense for us!
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>> We updated the KIP for points 1 and 4.
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>> 2/ We followed the DeserializationExceptionHandler interface
> >>>>>>   >>>>> signature, it was not on our mind that the record be forwarded
> >>>>>> with
> >>>>>>   >>>>> the ProcessorContext.
> >>>>>>   >>>>>
> >>>>>>   >>>>> The ProcessingContext is sufficient, we do expect that most
> >>> people
> >>>>>>   >>>>> would need to access the RecordMetadata.
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>> 3/ The use of Record<Object, Object> is required, as the error
> >>>>>> could
> >>>>>>   >>>>> occurred in the middle of a processor where records could be
> >>> non
> >>>>>>   >>>>> serializable objects
> >>>>>>   >>>>>
> >>>>>>   >>>>> As it is a global error catching, the user may need little
> >>>>>>   >>>>> information about the faulty record.
> >>>>>>   >>>>>
> >>>>>>   >>>>> Assuming that users want to make some specific treatments to
> >>> the
> >>>>>>   >>>>> record, they can add a try / catch block in the topology.
> >>>>>>   >>>>>
> >>>>>>   >>>>> It is up to users to cast record value and key in the
> >>>>>> implementation
> >>>>>>   >>>>> of the ProcessorExceptionHandler.
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>> Cheers
> >>>>>>   >>>>>
> >>>>>>   >>>>> Loïc, Damien and Sébastien
> >>>>>>   >>>>>
> >>>>>>   >>>>> ________________________________
> >>>>>>   >>>>> De : Sophie Blee-Goldman
> >>>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>>
> >>>>>>   >>>>> Envoyé : samedi 6 avril 2024 01:08
> >>>>>>   >>>>> À : dev@kafka.apache.org<mailto:dev@kafka.apache.org>
> >>>>>> <dev@kafka.apache.org<mailto:dev@kafka.apache.org>>
> >>>>>>   >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams
> >>> exception
> >>>>>>   >>>>> handler for exceptions occuring during processing
> >>>>>>   >>>>>
> >>>>>>   >>>>> Warning External sender Do not click on any links or open any
> >>>>>>   >>>>> attachments unless you trust the sender and know the content is
> >>>>>> safe.
> >>>>>>   >>>>>
> >>>>>>   >>>>> Hi Damien,
> >>>>>>   >>>>>
> >>>>>>   >>>>> First off thanks for the KIP, this is definitely a much needed
> >>>>>>   >>>>> feature. On
> >>>>>>   >>>>> the
> >>>>>>   >>>>> whole it seems pretty straightforward and I am in favor of the
> >>>>>>   >> proposal.
> >>>>>>   >>>>> Just
> >>>>>>   >>>>> a few questions and suggestions here and there:
> >>>>>>   >>>>>
> >>>>>>   >>>>> 1. One of the #handle method's parameters is "ProcessorNode
> >>>>>> node", but
> >>>>>>   >>>>> ProcessorNode is an internal class (and would expose a lot of
> >>>>>>   >> internals
> >>>>>>   >>>>> that we probably don't want to pass in to an exception
> >>>>>> handler). Would
> >>>>>>   >>> it
> >>>>>>   >>>>> be sufficient to just make this a String and pass in the
> >>> processor
> >>>>>>   >> name?
> >>>>>>   >>>>>
> >>>>>>   >>>>> 2. Another of the parameters in the ProcessorContext. This
> >>> would
> >>>>>>   >> enable
> >>>>>>   >>>>> the handler to potentially forward records, which imo should
> >>>>>> not be
> >>>>>>   >> done
> >>>>>>   >>>>> from the handler since it could only ever call #forward but not
> >>>>>> direct
> >>>>>>   >>>>> where
> >>>>>>   >>>>> the record is actually forwarded to, and could cause confusion
> >>> if
> >>>>>>   >> users
> >>>>>>   >>>>> aren't aware that the handler is effectively calling from the
> >>>>>> context
> >>>>>>   >>>>> of the
> >>>>>>   >>>>> processor that threw the exception.
> >>>>>>   >>>>> 2a. If you don't explicitly want the ability to forward
> >>> records, I
> >>>>>>   >> would
> >>>>>>   >>>>> suggest changing the type of this parameter to
> >>> ProcessingContext,
> >>>>>>   >> which
> >>>>>>   >>>>> has all the metadata and useful info of the ProcessorContext
> >>> but
> >>>>>>   >> without
> >>>>>>   >>>>> the
> >>>>>>   >>>>> forwarding APIs. This would also lets us sidestep the following
> >>>>>> issue:
> >>>>>>   >>>>> 2b. If you *do* want the ability to forward records, setting
> >>> aside
> >>>>>>   >>>>> whether
> >>>>>>   >>>>> that
> >>>>>>   >>>>> in of itself makes sense to do, we would need to pass in
> >>> either a
> >>>>>>   >>> regular
> >>>>>>   >>>>> ProcessorContext or a FixedKeyProcessorContext, depending on
> >>>>>> what kind
> >>>>>>   >>>>> of processor it is. I'm not quite sure how we could design a
> >>>>>> clean API
> >>>>>>   >>>>> here,
> >>>>>>   >>>>> so I'll hold off until you clarify whether you even want
> >>>>>> forwarding or
> >>>>>>   >>>>> not.
> >>>>>>   >>>>> We would also need to split the input record into a Record vs
> >>>>>>   >>>>> FixedKeyRecord
> >>>>>>   >>>>>
> >>>>>>   >>>>> 3. One notable difference between this handler and the existing
> >>>>>> ones
> >>>>>>   >> you
> >>>>>>   >>>>> pointed out, the Deserialization/ProductionExceptionHandler, is
> >>>>>> that
> >>>>>>   >> the
> >>>>>>   >>>>> records passed in to those are in serialized bytes, whereas the
> >>>>>> record
> >>>>>>   >>>>> here would be POJOs. You account for this by making the
> >>> parameter
> >>>>>>   >>>>> type a Record<Object, Object>, but I just wonder how users
> >>>>>> would be
> >>>>>>   >>>>> able to read the key/value and figure out what type it should
> >>>>>> be. For
> >>>>>>   >>>>> example, would they need to maintain a map from processor name
> >>> to
> >>>>>>   >>>>> input record types?
> >>>>>>   >>>>>
> >>>>>>   >>>>> If you could provide an example of this new feature in the
> >>> KIP, it
> >>>>>>   >>>>> would be
> >>>>>>   >>>>> very helpful in understanding whether we could do something to
> >>>>>> make it
> >>>>>>   >>>>> easier for users to use, for if it would be fine as-is
> >>>>>>   >>>>>
> >>>>>>   >>>>> 4. We should include all the relevant info for a new metric,
> >>>>>> such as
> >>>>>>   >> the
> >>>>>>   >>>>> metric
> >>>>>>   >>>>> group and recording level. You can look at other metrics KIPs
> >>> like
> >>>>>>   >>>>> KIP-444
> >>>>>>   >>>>> and KIP-613 for an example. I suspect you intend for this to be
> >>>>>> in the
> >>>>>>   >>>>> processor group and at the INFO level?
> >>>>>>   >>>>>
> >>>>>>   >>>>> Hope that all makes sense! Thanks again for the KIP
> >>>>>>   >>>>>
> >>>>>>   >>>>> -Sophie
> >>>>>>   >>>>>
> >>>>>>   >>>>> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <
> >>>>>>   >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>   >>>>
> >>>>>>   >>>>> wrote:
> >>>>>>   >>>>>
> >>>>>>   >>>>>> Hi everyone,
> >>>>>>   >>>>>>
> >>>>>>   >>>>>> After writing quite a few Kafka Streams applications, me and
> >>> my
> >>>>>>   >>>>>> colleagues
> >>>>>>   >>>>>> just created KIP-1033 to introduce a new Exception Handler in
> >>>>>> Kafka
> >>>>>>   >>>>>> Streams
> >>>>>>   >>>>>> to simplify error handling.
> >>>>>>   >>>>>> This feature would allow defining an exception handler to
> >>>>>>   >> automatically
> >>>>>>   >>>>>> catch exceptions occurring during the processing of a message.
> >>>>>>   >>>>>>
> >>>>>>   >>>>>> KIP link:
> >>>>>>   >>>>>>
> >>>>>>   >>>>>>
> >>>>>>   >>>
> >>>>>>   >>
> >>>>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>> <
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>>> <
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>> <
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>>>>
> >>>>>>   >>> <
> >>>>>>   >>>
> >>>>>>   >>
> >>>>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>> <
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>>> <
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>> <
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> >>>>>
> >>>>>>   >>>>
> >>>>>>   >>>>>>
> >>>>>>   >>>>>> Feedbacks and suggestions are welcome,
> >>>>>>   >>>>>>
> >>>>>>   >>>>>> Cheers,
> >>>>>>   >>>>>> Damien, Sebastien and Loic
> >>>>>>   >>>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>>> This email was screened for spam and malicious content but
> >>>>>> exercise
> >>>>>>   >>>>> caution anyway.
> >>>>>>   >>>>>
> >>>>>>   >>>>>
> >>>>>>   >>>
> >>>>>>   >>
> >>>

Reply via email to