Hi Everyone, Following the discussions on KIP-1033 and KIP-1034, we did a few changes: - We introduced a new ProcessingMetadata class containing only the ProcessorContext metadata: topic, partition, offset, headers[], sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName - To be consistent, we propose to deprecate the existing DeserializationExceptionHandler and ProductionExceptionHandler methods to rely on the new ProcessingMetadata - The creation and the ProcessingMetadata and the deprecation of old methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on Dead Letter Queue implementation without touching any interfaces. We introduced a hard dependency for KIP-1034 regarding KIP-1033, we think it's the wisest implementation wise. - Instead of creating a new metric, KIP-1033 updates the dropped-record metric.
Let me know what you think, if everything's fine, I think we should be good to start a VOTE? Cheers, Damien On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman <sop...@responsive.dev> wrote: > > Fully agree about creating a new class for the bits of ProcessingContext > that are specific to metadata only. In fact, more or less this same point > just came up in the related KIP 1034 for DLQs, since the RecordMetadata > can't always be trusted to remain immutable. Maybe it's possible to solve > both issues at once, with the same class? > > On another related note -- I had actually also just proposed that we > deprecate the existing DeserializationExceptionHandler method and replace > it with one using the new PAPI as part of KIP-1034. But now that I'm > reading this, I would say it probably makes more sense to do in this KIP. > We can also push that out into a smaller-scoped third KIP if you want, but > clearly, there is some overlap here and so however you guys (the authors) > want to organize this part of the work is fine with me. I do think it > should be done alongside/before this KIP and 1034 though, for all the > reasons already stated. > > Everything else in the discussion so far I agree with! The > ProcessingContext thing is the only open question in my mind > > On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina <d.gaspar...@gmail.com> > wrote: > > > Hi Matthias, Bruno, > > > > 1.a During my previous comment, by Processor Node ID, I meant > > Processor name. This is important information to expose in the handler > > as it allows users to identify the location of the exception in the > > topology. > > I assume this information could be useful in other places, that's why > > I would lean toward adding this as an attribute in the > > ProcessingContext. > > > > 1.b Looking at the ProcessingContext, I do think the following 3 > > methods should not be accessible in the exception handlers: > > getStateStore(), schedule() and commit(). > > Having a separate interface would make a cleaner signature. It would > > also be a great time to ensure that all exception handlers are > > consistent, at the moment, the > > DeserializationExceptionHandler.handle() relies on the old PAPI > > ProcessorContext and the ProductionExceptionHandler.handle() has none. > > It could make sense to build the new interface in this KIP and track > > the effort to migrate the existing handlers in a separate KIP, what do > > you think? > > Maybe I am overthinking this part and the ProcessingContext would be fine. > > > > 4. Good point regarding the dropped-record metric, as it is used by > > the other handlers, I do think it makes sense to leverage it instead > > of creating a new metric. > > I will update the KIP to update the dropped-record-metric. > > > > 8. Regarding the DSL, I am aligned with Bruno, I think we could close > > the gaps in a future KIP. > > > > Cheers, > > Damien > > > > > > On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna <cado...@apache.org> wrote: > > > > > > Hi Matthias, > > > > > > > > > 1.a > > > With processor node ID, I mean the ID that is exposed in the tags of > > > processor node metrics. That ID cannot be internal since it is exposed > > > in metrics. I think the processor name and the processor node ID is the > > > same thing. I followed how the processor node ID is set in metrics and I > > > ended up in addProcessor(name, ...). > > > > > > > > > 1.b > > > Regarding ProcessingContext, I also thought about a separate class to > > > pass-in context information into the handler, but then I dismissed the > > > idea because I thought I was overthinking it. Apparently, I was not > > > overthinking it if you also had the same idea. So let's consider a > > > separate class. > > > > > > > > > 4. > > > Regarding the metric, thanks for pointing to the dropped-record metric, > > > Matthias. The dropped-record metric is used with the deserialization > > > handler and the production handler. So, it would make sense to also use > > > it for this handler. However, the dropped-record metric only records > > > records that are skipped by the handler and not the number of calls to > > > the handler. But that difference is probably irrelevant since in case of > > > FAIL, the metric will be reset anyways since the stream thread will be > > > restarted. In conclusion, I think the dropped-record metric in > > > combination with a warn log message might be the better choice to > > > introducing a new metric. > > > > > > > > > 8. > > > Regarding the DSL, I think we should close possible gaps in a separate > > KIP. > > > > > > > > > Best, > > > Bruno > > > > > > On 4/11/24 12:06 AM, Matthias J. Sax wrote: > > > > Thanks for the KIP. Great discussion. > > > > > > > > I am not sure if I understand the proposal from Bruno to hand in the > > > > processor node id? Isn't this internal (could not even find it > > quickly). > > > > We do have a processor name, right? Or do I mix up something? > > > > > > > > Another question is about `ProcessingContext` -- it contains a lot of > > > > (potentially irrelevant?) metadata. We should think carefully about > > what > > > > we want to pass in and what not -- removing stuff is hard, but adding > > > > stuff is easy. It's always an option to create a new interface that > > only > > > > exposes stuff we find useful, and allows us to evolve this interface > > > > independent of others. Re-using an existing interface always has the > > > > danger to introduce an undesired coupling that could bite us in the > > > > future. -- It make total sense to pass in `RecordMetadata`, but > > > > `ProcessingContext` (even if already limited compared to > > > > `ProcessorContext`) still seems to be too broad? For example, there is > > > > `getStateStore()` and `schedule()` methods which I think we should not > > > > expose. > > > > > > > > The other interesting question is about "what record gets passed in". > > > > For the PAPI, passing in the Processor's input record make a lot of > > > > sense. However, for DSL operators, I am not 100% sure? The DSL often > > > > uses internal types not exposed to the user, and thus I am not sure if > > > > users could write useful code for this case? -- In general, I still > > > > agree that the handler should be implement with a try-catch around > > > > `Processor.process()` but it might not be too useful for DSL processor. > > > > Hence, I am wondering if we need to so something more in the DSL? I > > > > don't have a concrete proposal (a few high level ideas only) and if we > > > > don't do anything special for the DSL I am ok with moving forward with > > > > this KIP as-is, but we should be aware of potential limitations for DSL > > > > users. We can always do a follow up KIP to close gaps when we > > understand > > > > the impact better -- covering the DSL would also expand the scope of > > > > this KIP significantly... > > > > > > > > About the metric: just to double check. Do we think it's worth to add a > > > > new metric? Or could we re-use the existing "dropped record metric"? > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 4/10/24 5:11 AM, Sebastien Viale wrote: > > > >> Hi, > > > >> > > > >> You are right, it will simplify types. > > > >> > > > >> We update the KIP > > > >> > > > >> regards > > > >> > > > >> Sébastien *VIALE*** > > > >> > > > >> *MICHELIN GROUP* - InfORMATION Technology > > > >> *Technical Expert Kafka* > > > >> > > > >> Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand > > > >> > > > >> > > ------------------------------------------------------------------------ > > > >> *De :* Bruno Cadonna <cado...@apache.org> > > > >> *Envoyé :* mercredi 10 avril 2024 10:38 > > > >> *À :* dev@kafka.apache.org <dev@kafka.apache.org> > > > >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > > > >> handler for exceptions occuring during processing > > > >> Warning External sender Do not click on any links or open any > > > >> attachments unless you trust the sender and know the content is safe. > > > >> > > > >> Hi Loïc, Damien, and Sébastien, > > > >> > > > >> Great that we are converging! > > > >> > > > >> > > > >> 3. > > > >> Damien and Loïc, I think in your examples the handler will receive > > > >> Record<Object, Object> because an Record<Object, Object> is passed to > > > >> the processor in the following code line: > > > >> > > https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > > < > > https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > > > > > > >> > > > >> I see that we do not need to pass into the the handler a > > Record<byte[], > > > >> byte[]> just because we do that for the > > DeserializationExceptionHandler > > > >> and the ProductionExceptionHandler. When those two handlers are > > called, > > > >> the record is already serialized. This is not the case for the > > > >> ProcessingExceptionHandler. However, I would propose to use Record<?, > > ?> > > > >> for the record that is passed to the ProcessingExceptionHandler > > because > > > >> it makes the handler API more flexible. > > > >> > > > >> > > > >> Best, > > > >> Bruno > > > >> > > > >> This email was screened for spam and malicious content but exercise > > > >> caution anyway. > > > >> > > > >> > > > >> > > > >> > > > >> On 4/9/24 9:09 PM, Loic Greffier wrote: > > > >> > Hi Bruno and Bill, > > > >> > > > > >> > To complete the Damien's purposes about the point 3. > > > >> > > > > >> > Processing errors are caught and handled by the > > > >> ProcessingErrorHandler, at the precise moment when records are > > > >> processed by processor nodes. The handling will be performed in the > > > >> "process" method of the ProcessorNode, such as: > > > >> > > > > >> > public void process(final Record<KIn, VIn> record) { > > > >> > ... > > > >> > > > > >> > try { > > > >> > ... > > > >> > } catch (final ClassCastException e) { > > > >> > ... > > > >> > } catch (Exception e) { > > > >> > ProcessingExceptionHandler.ProcessingHandlerResponse response = > > > >> this.processingExceptionHandler > > > >> > .handle(internalProcessorContext, (Record<Object, Object>) record, > > e); > > > >> > > > > >> > if (response == > > > >> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { > > > >> > throw new StreamsException("Processing exception handler is set to > > > >> fail upon" + > > > >> > " a processing error. If you would rather have the streaming > > > >> pipeline" + > > > >> > " continue after a processing error, please set the " + > > > >> > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " > > appropriately.", > > > >> > e); > > > >> > } > > > >> > } > > > >> > } > > > >> > As you can see, the record is transmitted to the > > > >> ProcessingExceptionHandler as a Record<Object,Object>, as we are > > > >> dealing with the input record of the processor at this point. It can > > > >> be any type, including non-serializable types, as suggested by the > > > >> Damien's example. As the ProcessingErrorHandler is not intended to > > > >> perform any serialization, there should be no issue for the users to > > > >> handle a Record<Object,Object>. > > > >> > > > > >> > I follow Damien on the other points. > > > >> > > > > >> > For point 6, underlying public interfaces are renamed as well: > > > >> > - The ProcessingHandlerResponse > > > >> > - The > > > >> > > ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler > > > >> > - The configuration > > > >> DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG > > > >> (default.processing.exception.handler) > > > >> > > > > >> > Regards, > > > >> > > > > >> > Loïc > > > >> > > > > >> > De : Damien Gasparina <d.gaspar...@gmail.com> > > > >> > Envoyé : mardi 9 avril 2024 20:08 > > > >> > À : dev@kafka.apache.org > > > >> > Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams > > > >> exception handler for exceptions occuring during processing > > > >> > > > > >> > Warning External sender Do not click on any links or open any > > > >> attachments unless you trust the sender and know the content is safe. > > > >> > > > > >> > Hi Bruno, Bill, > > > >> > > > > >> > First of all, thanks a lot for all your useful comments. > > > >> > > > > >> >> 1. and 2. > > > >> >> I am wondering whether we should expose the processor node ID -- > > > >> which > > > >> >> basically is the processor node name -- in the ProcessingContext > > > >> >> interface. I think the processor node ID fits well in the > > > >> >> ProcessingContext interface since it already contains application > > > >> ID and > > > >> >> task ID and it would make the API for the handler cleaner. > > > >> > > > > >> > That's a good point, the actual ProcessorContextImpl is already > > > >> holding the > > > >> > current node in an attribute (currentNode), thus exposing the node > > > >> ID should > > > >> > not be a problem. Let me sleep on it and get back to you regarding > > > >> this > > > >> > point. > > > >> > > > > >> >> 3. > > > >> >> Could you elaborate -- maybe with an example -- when a record is > > in a > > > >> >> state in which it cannot be serialized? This is not completely > > > >> clear to > > > >> > me. > > > >> > > > > >> > The Record passed to the handler is the input record to the > > > >> processor. In > > > >> > the Kafka Streams API, it could be any POJO. > > > >> > e.g. with the following topology ` > > > >> > streamsBuilder.stream("x") > > > >> > .map((k, v) -> new KeyValue("foo", Pair.of("hello", > > > >> > "world"))) > > > >> > .forEach((k, v) -> throw new RuntimeException()) > > > >> > I would expect the handler to receive a Record<String, Pair<String, > > > >> > String>>. > > > >> > > > > >> >> 4. > > > >> >> Regarding the metrics, it is not entirely clear to me what the > > metric > > > >> >> measures. Is it the number of calls to the process handler or is > > > >> it the > > > >> >> number of calls to process handler that returned FAIL? > > > >> >> If it is the former, I was also wondering whether it would be > > > >> better to > > > >> >> put the task-level metrics to INFO reporting level and remove the > > > >> >> thread-level metric, similar to the dropped-records metric. You > > can > > > >> >> always roll-up the metrics to the thread level in your preferred > > > >> >> monitoring system. Or do you think we end up with to many metrics? > > > >> > > > > >> > We were thinking of the former, measuring the number of calls to > > the > > > >> > process handler. That's a good point, having the information at the > > > >> task > > > >> > level could be beneficial. I updated the KIP to change the metric > > > >> level > > > >> > and to clarify the wording. > > > >> > > > > >> >> 5. > > > >> >> What do you think about naming the handler > > ProcessingExceptionHandler > > > >> >> instead of ProcessExceptionHandler? > > > >> >> The DeserializationExceptionHanlder and the > > > >> ProductionExceptionHandler > > > >> >> also use the noun of the action in their name and not the verb. > > > >> > > > > >> > Good catch, I updated the KIP to rename it > > ProcessingExceptionHandler. > > > >> > > > > >> >> 6. > > > >> >> What record is exactly passed to the handler? > > > >> >> Is it the input record to the task? Is it the input record to the > > > >> >> processor node? Is it the input record to the processor? > > > >> > > > > >> > The input record of the processor. I assume that is the most user > > > >> > friendly record in this context. > > > >> > > > > >> >> 7. > > > >> >> Could you please add the packages of the Java > > > >> classes/interfaces/enums > > > >> >> you want to add? > > > >> > > > > >> > Done, without any surprises: package > > org.apache.kafka.streams.errors; > > > >> > > > > >> > > > > >> > Thanks a lot for your reviews! Cheers, > > > >> > Damien > > > >> > This email was screened for spam and malicious content but exercise > > > >> caution anyway. > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > On Tue, 9 Apr 2024 at 18:04, Bill Bejeck > > > >> <bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote: > > > >> > > > > >> >> Hi Damien, Sebastien and Loic, > > > >> >> > > > >> >> Thanks for the KIP, this is a much-needed addition. > > > >> >> I like the approach of getting the plumbing in for handling > > processor > > > >> >> errors, allowing users to implement more complex solutions as > > needed. > > > >> >> > > > >> >> Overall how where the KIP Is now LGTM, modulo outstanding > > comments. I > > > >> >> think adding the example you included in this thread to the KIP is > > > >> a great > > > >> >> idea. > > > >> >> > > > >> >> Regarding the metrics, I'm thinking along the same lines as Bruno. > > > >> I'm > > > >> >> wondering if we can make do with a task-level metric at the INFO > > > >> level and > > > >> >> the processor metric at DEBUG. IMHO, when it comes to tracking > > > >> exceptions > > > >> >> in processing, these two areas are where users will want to focus, > > > >> higher > > > >> >> level metrics wouldn't be as useful in this case. > > > >> >> > > > >> >> Thanks, > > > >> >> Bill > > > >> >> > > > >> >> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna > > > >> <cado...@apache.org<mailto:cado...@apache.org>> wrote: > > > >> >> > > > >> >>> Hi again, > > > >> >>> > > > >> >>> I have additional questions/comments. > > > >> >>> > > > >> >>> 6. > > > >> >>> What record is exactly passed to the handler? > > > >> >>> Is it the input record to the task? Is it the input record to the > > > >> >>> processor node? Is it the input record to the processor? > > > >> >>> > > > >> >>> > > > >> >>> 7. > > > >> >>> Could you please add the packages of the Java > > > >> classes/interfaces/enums > > > >> >>> you want to add? > > > >> >>> > > > >> >>> > > > >> >>> Best, > > > >> >>> Bruno > > > >> >>> > > > >> >>> > > > >> >>> On 4/9/24 10:17 AM, Bruno Cadonna wrote: > > > >> >>>> Hi Loïc, Damien, and Sébastien, > > > >> >>>> > > > >> >>>> Thanks for the KIP! > > > >> >>>> I find it really great that you contribute back to Kafka Streams > > > >> >>>> concepts you developed for kstreamplify so that everybody can > > take > > > >> >>>> advantage from your improvements. > > > >> >>>> > > > >> >>>> I have a couple of questions/comments: > > > >> >>>> > > > >> >>>> 1. and 2. > > > >> >>>> I am wondering whether we should expose the processor node ID -- > > > >> which > > > >> >>>> basically is the processor node name -- in the ProcessingContext > > > >> >>>> interface. I think the processor node ID fits well in the > > > >> >>>> ProcessingContext interface since it already contains > > > >> application ID > > > >> >> and > > > >> >>>> task ID and it would make the API for the handler cleaner. > > > >> >>>> > > > >> >>>> > > > >> >>>> 3. > > > >> >>>> Could you elaborate -- maybe with an example -- when a record is > > > >> in a > > > >> >>>> state in which it cannot be serialized? This is not completely > > > >> clear to > > > >> >>> me. > > > >> >>>> > > > >> >>>> > > > >> >>>> 4. > > > >> >>>> Regarding the metrics, it is not entirely clear to me what the > > > >> metric > > > >> >>>> measures. Is it the number of calls to the process handler or is > > > >> it the > > > >> >>>> number of calls to process handler that returned FAIL? > > > >> >>>> If it is the former, I was also wondering whether it would be > > > >> better to > > > >> >>>> put the task-level metrics to INFO reporting level and remove > > the > > > >> >>>> thread-level metric, similar to the dropped-records metric. You > > can > > > >> >>>> always roll-up the metrics to the thread level in your preferred > > > >> >>>> monitoring system. Or do you think we end up with to many > > metrics? > > > >> >>>> > > > >> >>>> > > > >> >>>> 5. > > > >> >>>> What do you think about naming the handler > > > >> ProcessingExceptionHandler > > > >> >>>> instead of ProcessExceptionHandler? > > > >> >>>> The DeserializationExceptionHanlder and the > > > >> ProductionExceptionHandler > > > >> >>>> also use the noun of the action in their name and not the verb. > > > >> >>>> > > > >> >>>> > > > >> >>>> Best, > > > >> >>>> Bruno > > > >> >>>> > > > >> >>>> > > > >> >>>> On 4/8/24 3:48 PM, Sebastien Viale wrote: > > > >> >>>>> Thanks for your review! > > > >> >>>>> > > > >> >>>>> All the points make sense for us! > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> We updated the KIP for points 1 and 4. > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> 2/ We followed the DeserializationExceptionHandler interface > > > >> >>>>> signature, it was not on our mind that the record be forwarded > > > >> with > > > >> >>>>> the ProcessorContext. > > > >> >>>>> > > > >> >>>>> The ProcessingContext is sufficient, we do expect that most > > people > > > >> >>>>> would need to access the RecordMetadata. > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> 3/ The use of Record<Object, Object> is required, as the error > > > >> could > > > >> >>>>> occurred in the middle of a processor where records could be > > non > > > >> >>>>> serializable objects > > > >> >>>>> > > > >> >>>>> As it is a global error catching, the user may need little > > > >> >>>>> information about the faulty record. > > > >> >>>>> > > > >> >>>>> Assuming that users want to make some specific treatments to > > the > > > >> >>>>> record, they can add a try / catch block in the topology. > > > >> >>>>> > > > >> >>>>> It is up to users to cast record value and key in the > > > >> implementation > > > >> >>>>> of the ProcessorExceptionHandler. > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> Cheers > > > >> >>>>> > > > >> >>>>> Loïc, Damien and Sébastien > > > >> >>>>> > > > >> >>>>> ________________________________ > > > >> >>>>> De : Sophie Blee-Goldman > > > >> <sop...@responsive.dev<mailto:sop...@responsive.dev>> > > > >> >>>>> Envoyé : samedi 6 avril 2024 01:08 > > > >> >>>>> À : dev@kafka.apache.org<mailto:dev@kafka.apache.org> > > > >> <dev@kafka.apache.org<mailto:dev@kafka.apache.org>> > > > >> >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams > > exception > > > >> >>>>> handler for exceptions occuring during processing > > > >> >>>>> > > > >> >>>>> Warning External sender Do not click on any links or open any > > > >> >>>>> attachments unless you trust the sender and know the content is > > > >> safe. > > > >> >>>>> > > > >> >>>>> Hi Damien, > > > >> >>>>> > > > >> >>>>> First off thanks for the KIP, this is definitely a much needed > > > >> >>>>> feature. On > > > >> >>>>> the > > > >> >>>>> whole it seems pretty straightforward and I am in favor of the > > > >> >> proposal. > > > >> >>>>> Just > > > >> >>>>> a few questions and suggestions here and there: > > > >> >>>>> > > > >> >>>>> 1. One of the #handle method's parameters is "ProcessorNode > > > >> node", but > > > >> >>>>> ProcessorNode is an internal class (and would expose a lot of > > > >> >> internals > > > >> >>>>> that we probably don't want to pass in to an exception > > > >> handler). Would > > > >> >>> it > > > >> >>>>> be sufficient to just make this a String and pass in the > > processor > > > >> >> name? > > > >> >>>>> > > > >> >>>>> 2. Another of the parameters in the ProcessorContext. This > > would > > > >> >> enable > > > >> >>>>> the handler to potentially forward records, which imo should > > > >> not be > > > >> >> done > > > >> >>>>> from the handler since it could only ever call #forward but not > > > >> direct > > > >> >>>>> where > > > >> >>>>> the record is actually forwarded to, and could cause confusion > > if > > > >> >> users > > > >> >>>>> aren't aware that the handler is effectively calling from the > > > >> context > > > >> >>>>> of the > > > >> >>>>> processor that threw the exception. > > > >> >>>>> 2a. If you don't explicitly want the ability to forward > > records, I > > > >> >> would > > > >> >>>>> suggest changing the type of this parameter to > > ProcessingContext, > > > >> >> which > > > >> >>>>> has all the metadata and useful info of the ProcessorContext > > but > > > >> >> without > > > >> >>>>> the > > > >> >>>>> forwarding APIs. This would also lets us sidestep the following > > > >> issue: > > > >> >>>>> 2b. If you *do* want the ability to forward records, setting > > aside > > > >> >>>>> whether > > > >> >>>>> that > > > >> >>>>> in of itself makes sense to do, we would need to pass in > > either a > > > >> >>> regular > > > >> >>>>> ProcessorContext or a FixedKeyProcessorContext, depending on > > > >> what kind > > > >> >>>>> of processor it is. I'm not quite sure how we could design a > > > >> clean API > > > >> >>>>> here, > > > >> >>>>> so I'll hold off until you clarify whether you even want > > > >> forwarding or > > > >> >>>>> not. > > > >> >>>>> We would also need to split the input record into a Record vs > > > >> >>>>> FixedKeyRecord > > > >> >>>>> > > > >> >>>>> 3. One notable difference between this handler and the existing > > > >> ones > > > >> >> you > > > >> >>>>> pointed out, the Deserialization/ProductionExceptionHandler, is > > > >> that > > > >> >> the > > > >> >>>>> records passed in to those are in serialized bytes, whereas the > > > >> record > > > >> >>>>> here would be POJOs. You account for this by making the > > parameter > > > >> >>>>> type a Record<Object, Object>, but I just wonder how users > > > >> would be > > > >> >>>>> able to read the key/value and figure out what type it should > > > >> be. For > > > >> >>>>> example, would they need to maintain a map from processor name > > to > > > >> >>>>> input record types? > > > >> >>>>> > > > >> >>>>> If you could provide an example of this new feature in the > > KIP, it > > > >> >>>>> would be > > > >> >>>>> very helpful in understanding whether we could do something to > > > >> make it > > > >> >>>>> easier for users to use, for if it would be fine as-is > > > >> >>>>> > > > >> >>>>> 4. We should include all the relevant info for a new metric, > > > >> such as > > > >> >> the > > > >> >>>>> metric > > > >> >>>>> group and recording level. You can look at other metrics KIPs > > like > > > >> >>>>> KIP-444 > > > >> >>>>> and KIP-613 for an example. I suspect you intend for this to be > > > >> in the > > > >> >>>>> processor group and at the INFO level? > > > >> >>>>> > > > >> >>>>> Hope that all makes sense! Thanks again for the KIP > > > >> >>>>> > > > >> >>>>> -Sophie > > > >> >>>>> > > > >> >>>>> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina < > > > >> >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > > > >> >>>> > > > >> >>>>> wrote: > > > >> >>>>> > > > >> >>>>>> Hi everyone, > > > >> >>>>>> > > > >> >>>>>> After writing quite a few Kafka Streams applications, me and > > my > > > >> >>>>>> colleagues > > > >> >>>>>> just created KIP-1033 to introduce a new Exception Handler in > > > >> Kafka > > > >> >>>>>> Streams > > > >> >>>>>> to simplify error handling. > > > >> >>>>>> This feature would allow defining an exception handler to > > > >> >> automatically > > > >> >>>>>> catch exceptions occurring during the processing of a message. > > > >> >>>>>> > > > >> >>>>>> KIP link: > > > >> >>>>>> > > > >> >>>>>> > > > >> >>> > > > >> >> > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > < > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > >< > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > < > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > >> > > > >> >>> < > > > >> >>> > > > >> >> > > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > < > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > >< > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > < > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > >> > > > >> >>>> > > > >> >>>>>> > > > >> >>>>>> Feedbacks and suggestions are welcome, > > > >> >>>>>> > > > >> >>>>>> Cheers, > > > >> >>>>>> Damien, Sebastien and Loic > > > >> >>>>>> > > > >> >>>>> > > > >> >>>>> This email was screened for spam and malicious content but > > > >> exercise > > > >> >>>>> caution anyway. > > > >> >>>>> > > > >> >>>>> > > > >> >>> > > > >> >> > >