Hi Bruno, 1) Good point, naming stuff is definitely hard, I renamed ProcessingMetadata to ErrorHandlerContext
> Is there any reason you did not use something like > Record<byte[], byte[]> sourceRecord() 2) The record class is used in many locations and, I assume, could be expanded by new features. As the error handler metadata could be stored in memory for a longer duration due to the ProductionExceptionHandle, I think it is wiser to have an independent class that is the leanest possible. Maybe I am overthinking this, what do you think? 3) Oops, good point, I didn't notice that it was still there. I removed the parameter. 4) To clarify the KIP, I specified the DeserializationExceptionHandler interface in the KIP. 5) Good point, I updated the KIP with the order you mentioned: context, record, exception 6) I was indeed thinking of extending the ProcessingContext to store it there. Let me update the KIP to make it clear. >From past experience, deprecating an interface is always a bit painful, in this KIP, I relied on default implementation to ensure backward compatibility while encouraging people to implement the new method signature. If you know a better approach, I'll take :-) Cheers, Damien On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna <cado...@apache.org> wrote: > > Hi Damien, > > Thanks a lot for the updates! > > I have the following comments: > > (1) > Could you rename ProcessingMetadata to ErrorHandlerContext or > ErrorHandlerMetadata (I am preferring the former)? I think it makes it > clearer for what this context/metadata is for. > > > (2) > Is there any reason you did not use something like > > Record<byte[], byte[]> sourceRecord() > > in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and > headers()? The headers() method refers to the record read from the input > topic of the sub-topology, right? If yes, maybe that is also something > to mention more explicitly. > > > (3) > Since you added the processor node ID to the ProcessingMetadata, you can > remove it from the signature of method handle() in > ProcessingExceptionHandler. > > > (4) > Where are the mentioned changes to the DeserializationExceptionHandler? > > > (5) > To be consistent, the order of the parameters in the > ProductionExceptionHandler should be: > 1. context > 2. record > 3. exception > > > (6) > I am wondering where the implementation of ProcessingMetadata gets the > sourceRawKey/Value from. Do we need additional changes in > ProcessingContext and implementations? > > > Best, > Bruno > > > On 4/21/24 2:23 PM, Damien Gasparina wrote: > > Hi Everyone, > > > > Following the discussions on KIP-1033 and KIP-1034, we did a few changes: > > - We introduced a new ProcessingMetadata class containing only the > > ProcessorContext metadata: topic, partition, offset, headers[], > > sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName > > - To be consistent, we propose to deprecate the existing > > DeserializationExceptionHandler and ProductionExceptionHandler methods > > to rely on the new ProcessingMetadata > > - The creation and the ProcessingMetadata and the deprecation of old > > methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on > > Dead Letter Queue implementation without touching any interfaces. We > > introduced a hard dependency for KIP-1034 regarding KIP-1033, we think > > it's the wisest implementation wise. > > - Instead of creating a new metric, KIP-1033 updates the > > dropped-record metric. > > > > Let me know what you think, if everything's fine, I think we should be > > good to start a VOTE? > > > > Cheers, > > Damien > > > > > > > > > > > > On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman <sop...@responsive.dev> > > wrote: > >> > >> Fully agree about creating a new class for the bits of ProcessingContext > >> that are specific to metadata only. In fact, more or less this same point > >> just came up in the related KIP 1034 for DLQs, since the RecordMetadata > >> can't always be trusted to remain immutable. Maybe it's possible to solve > >> both issues at once, with the same class? > >> > >> On another related note -- I had actually also just proposed that we > >> deprecate the existing DeserializationExceptionHandler method and replace > >> it with one using the new PAPI as part of KIP-1034. But now that I'm > >> reading this, I would say it probably makes more sense to do in this KIP. > >> We can also push that out into a smaller-scoped third KIP if you want, but > >> clearly, there is some overlap here and so however you guys (the authors) > >> want to organize this part of the work is fine with me. I do think it > >> should be done alongside/before this KIP and 1034 though, for all the > >> reasons already stated. > >> > >> Everything else in the discussion so far I agree with! The > >> ProcessingContext thing is the only open question in my mind > >> > >> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina <d.gaspar...@gmail.com> > >> wrote: > >> > >>> Hi Matthias, Bruno, > >>> > >>> 1.a During my previous comment, by Processor Node ID, I meant > >>> Processor name. This is important information to expose in the handler > >>> as it allows users to identify the location of the exception in the > >>> topology. > >>> I assume this information could be useful in other places, that's why > >>> I would lean toward adding this as an attribute in the > >>> ProcessingContext. > >>> > >>> 1.b Looking at the ProcessingContext, I do think the following 3 > >>> methods should not be accessible in the exception handlers: > >>> getStateStore(), schedule() and commit(). > >>> Having a separate interface would make a cleaner signature. It would > >>> also be a great time to ensure that all exception handlers are > >>> consistent, at the moment, the > >>> DeserializationExceptionHandler.handle() relies on the old PAPI > >>> ProcessorContext and the ProductionExceptionHandler.handle() has none. > >>> It could make sense to build the new interface in this KIP and track > >>> the effort to migrate the existing handlers in a separate KIP, what do > >>> you think? > >>> Maybe I am overthinking this part and the ProcessingContext would be fine. > >>> > >>> 4. Good point regarding the dropped-record metric, as it is used by > >>> the other handlers, I do think it makes sense to leverage it instead > >>> of creating a new metric. > >>> I will update the KIP to update the dropped-record-metric. > >>> > >>> 8. Regarding the DSL, I am aligned with Bruno, I think we could close > >>> the gaps in a future KIP. > >>> > >>> Cheers, > >>> Damien > >>> > >>> > >>> On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna <cado...@apache.org> wrote: > >>>> > >>>> Hi Matthias, > >>>> > >>>> > >>>> 1.a > >>>> With processor node ID, I mean the ID that is exposed in the tags of > >>>> processor node metrics. That ID cannot be internal since it is exposed > >>>> in metrics. I think the processor name and the processor node ID is the > >>>> same thing. I followed how the processor node ID is set in metrics and I > >>>> ended up in addProcessor(name, ...). > >>>> > >>>> > >>>> 1.b > >>>> Regarding ProcessingContext, I also thought about a separate class to > >>>> pass-in context information into the handler, but then I dismissed the > >>>> idea because I thought I was overthinking it. Apparently, I was not > >>>> overthinking it if you also had the same idea. So let's consider a > >>>> separate class. > >>>> > >>>> > >>>> 4. > >>>> Regarding the metric, thanks for pointing to the dropped-record metric, > >>>> Matthias. The dropped-record metric is used with the deserialization > >>>> handler and the production handler. So, it would make sense to also use > >>>> it for this handler. However, the dropped-record metric only records > >>>> records that are skipped by the handler and not the number of calls to > >>>> the handler. But that difference is probably irrelevant since in case of > >>>> FAIL, the metric will be reset anyways since the stream thread will be > >>>> restarted. In conclusion, I think the dropped-record metric in > >>>> combination with a warn log message might be the better choice to > >>>> introducing a new metric. > >>>> > >>>> > >>>> 8. > >>>> Regarding the DSL, I think we should close possible gaps in a separate > >>> KIP. > >>>> > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 4/11/24 12:06 AM, Matthias J. Sax wrote: > >>>>> Thanks for the KIP. Great discussion. > >>>>> > >>>>> I am not sure if I understand the proposal from Bruno to hand in the > >>>>> processor node id? Isn't this internal (could not even find it > >>> quickly). > >>>>> We do have a processor name, right? Or do I mix up something? > >>>>> > >>>>> Another question is about `ProcessingContext` -- it contains a lot of > >>>>> (potentially irrelevant?) metadata. We should think carefully about > >>> what > >>>>> we want to pass in and what not -- removing stuff is hard, but adding > >>>>> stuff is easy. It's always an option to create a new interface that > >>> only > >>>>> exposes stuff we find useful, and allows us to evolve this interface > >>>>> independent of others. Re-using an existing interface always has the > >>>>> danger to introduce an undesired coupling that could bite us in the > >>>>> future. -- It make total sense to pass in `RecordMetadata`, but > >>>>> `ProcessingContext` (even if already limited compared to > >>>>> `ProcessorContext`) still seems to be too broad? For example, there is > >>>>> `getStateStore()` and `schedule()` methods which I think we should not > >>>>> expose. > >>>>> > >>>>> The other interesting question is about "what record gets passed in". > >>>>> For the PAPI, passing in the Processor's input record make a lot of > >>>>> sense. However, for DSL operators, I am not 100% sure? The DSL often > >>>>> uses internal types not exposed to the user, and thus I am not sure if > >>>>> users could write useful code for this case? -- In general, I still > >>>>> agree that the handler should be implement with a try-catch around > >>>>> `Processor.process()` but it might not be too useful for DSL processor. > >>>>> Hence, I am wondering if we need to so something more in the DSL? I > >>>>> don't have a concrete proposal (a few high level ideas only) and if we > >>>>> don't do anything special for the DSL I am ok with moving forward with > >>>>> this KIP as-is, but we should be aware of potential limitations for DSL > >>>>> users. We can always do a follow up KIP to close gaps when we > >>> understand > >>>>> the impact better -- covering the DSL would also expand the scope of > >>>>> this KIP significantly... > >>>>> > >>>>> About the metric: just to double check. Do we think it's worth to add a > >>>>> new metric? Or could we re-use the existing "dropped record metric"? > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 4/10/24 5:11 AM, Sebastien Viale wrote: > >>>>>> Hi, > >>>>>> > >>>>>> You are right, it will simplify types. > >>>>>> > >>>>>> We update the KIP > >>>>>> > >>>>>> regards > >>>>>> > >>>>>> Sébastien *VIALE*** > >>>>>> > >>>>>> *MICHELIN GROUP* - InfORMATION Technology > >>>>>> *Technical Expert Kafka* > >>>>>> > >>>>>> Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand > >>>>>> > >>>>>> > >>> ------------------------------------------------------------------------ > >>>>>> *De :* Bruno Cadonna <cado...@apache.org> > >>>>>> *Envoyé :* mercredi 10 avril 2024 10:38 > >>>>>> *À :* dev@kafka.apache.org <dev@kafka.apache.org> > >>>>>> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > >>>>>> handler for exceptions occuring during processing > >>>>>> Warning External sender Do not click on any links or open any > >>>>>> attachments unless you trust the sender and know the content is safe. > >>>>>> > >>>>>> Hi Loïc, Damien, and Sébastien, > >>>>>> > >>>>>> Great that we are converging! > >>>>>> > >>>>>> > >>>>>> 3. > >>>>>> Damien and Loïc, I think in your examples the handler will receive > >>>>>> Record<Object, Object> because an Record<Object, Object> is passed to > >>>>>> the processor in the following code line: > >>>>>> > >>> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > >>> < > >>> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > >>>> > >>>>>> > >>>>>> I see that we do not need to pass into the the handler a > >>> Record<byte[], > >>>>>> byte[]> just because we do that for the > >>> DeserializationExceptionHandler > >>>>>> and the ProductionExceptionHandler. When those two handlers are > >>> called, > >>>>>> the record is already serialized. This is not the case for the > >>>>>> ProcessingExceptionHandler. However, I would propose to use Record<?, > >>> ?> > >>>>>> for the record that is passed to the ProcessingExceptionHandler > >>> because > >>>>>> it makes the handler API more flexible. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> This email was screened for spam and malicious content but exercise > >>>>>> caution anyway. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 4/9/24 9:09 PM, Loic Greffier wrote: > >>>>>> > Hi Bruno and Bill, > >>>>>> > > >>>>>> > To complete the Damien's purposes about the point 3. > >>>>>> > > >>>>>> > Processing errors are caught and handled by the > >>>>>> ProcessingErrorHandler, at the precise moment when records are > >>>>>> processed by processor nodes. The handling will be performed in the > >>>>>> "process" method of the ProcessorNode, such as: > >>>>>> > > >>>>>> > public void process(final Record<KIn, VIn> record) { > >>>>>> > ... > >>>>>> > > >>>>>> > try { > >>>>>> > ... > >>>>>> > } catch (final ClassCastException e) { > >>>>>> > ... > >>>>>> > } catch (Exception e) { > >>>>>> > ProcessingExceptionHandler.ProcessingHandlerResponse response = > >>>>>> this.processingExceptionHandler > >>>>>> > .handle(internalProcessorContext, (Record<Object, Object>) record, > >>> e); > >>>>>> > > >>>>>> > if (response == > >>>>>> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { > >>>>>> > throw new StreamsException("Processing exception handler is set to > >>>>>> fail upon" + > >>>>>> > " a processing error. If you would rather have the streaming > >>>>>> pipeline" + > >>>>>> > " continue after a processing error, please set the " + > >>>>>> > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " > >>> appropriately.", > >>>>>> > e); > >>>>>> > } > >>>>>> > } > >>>>>> > } > >>>>>> > As you can see, the record is transmitted to the > >>>>>> ProcessingExceptionHandler as a Record<Object,Object>, as we are > >>>>>> dealing with the input record of the processor at this point. It can > >>>>>> be any type, including non-serializable types, as suggested by the > >>>>>> Damien's example. As the ProcessingErrorHandler is not intended to > >>>>>> perform any serialization, there should be no issue for the users to > >>>>>> handle a Record<Object,Object>. > >>>>>> > > >>>>>> > I follow Damien on the other points. > >>>>>> > > >>>>>> > For point 6, underlying public interfaces are renamed as well: > >>>>>> > - The ProcessingHandlerResponse > >>>>>> > - The > >>>>>> > >>> ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler > >>>>>> > - The configuration > >>>>>> DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG > >>>>>> (default.processing.exception.handler) > >>>>>> > > >>>>>> > Regards, > >>>>>> > > >>>>>> > Loïc > >>>>>> > > >>>>>> > De : Damien Gasparina <d.gaspar...@gmail.com> > >>>>>> > Envoyé : mardi 9 avril 2024 20:08 > >>>>>> > À : dev@kafka.apache.org > >>>>>> > Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams > >>>>>> exception handler for exceptions occuring during processing > >>>>>> > > >>>>>> > Warning External sender Do not click on any links or open any > >>>>>> attachments unless you trust the sender and know the content is safe. > >>>>>> > > >>>>>> > Hi Bruno, Bill, > >>>>>> > > >>>>>> > First of all, thanks a lot for all your useful comments. > >>>>>> > > >>>>>> >> 1. and 2. > >>>>>> >> I am wondering whether we should expose the processor node ID -- > >>>>>> which > >>>>>> >> basically is the processor node name -- in the ProcessingContext > >>>>>> >> interface. I think the processor node ID fits well in the > >>>>>> >> ProcessingContext interface since it already contains application > >>>>>> ID and > >>>>>> >> task ID and it would make the API for the handler cleaner. > >>>>>> > > >>>>>> > That's a good point, the actual ProcessorContextImpl is already > >>>>>> holding the > >>>>>> > current node in an attribute (currentNode), thus exposing the node > >>>>>> ID should > >>>>>> > not be a problem. Let me sleep on it and get back to you regarding > >>>>>> this > >>>>>> > point. > >>>>>> > > >>>>>> >> 3. > >>>>>> >> Could you elaborate -- maybe with an example -- when a record is > >>> in a > >>>>>> >> state in which it cannot be serialized? This is not completely > >>>>>> clear to > >>>>>> > me. > >>>>>> > > >>>>>> > The Record passed to the handler is the input record to the > >>>>>> processor. In > >>>>>> > the Kafka Streams API, it could be any POJO. > >>>>>> > e.g. with the following topology ` > >>>>>> > streamsBuilder.stream("x") > >>>>>> > .map((k, v) -> new KeyValue("foo", Pair.of("hello", > >>>>>> > "world"))) > >>>>>> > .forEach((k, v) -> throw new RuntimeException()) > >>>>>> > I would expect the handler to receive a Record<String, Pair<String, > >>>>>> > String>>. > >>>>>> > > >>>>>> >> 4. > >>>>>> >> Regarding the metrics, it is not entirely clear to me what the > >>> metric > >>>>>> >> measures. Is it the number of calls to the process handler or is > >>>>>> it the > >>>>>> >> number of calls to process handler that returned FAIL? > >>>>>> >> If it is the former, I was also wondering whether it would be > >>>>>> better to > >>>>>> >> put the task-level metrics to INFO reporting level and remove the > >>>>>> >> thread-level metric, similar to the dropped-records metric. You > >>> can > >>>>>> >> always roll-up the metrics to the thread level in your preferred > >>>>>> >> monitoring system. Or do you think we end up with to many metrics? > >>>>>> > > >>>>>> > We were thinking of the former, measuring the number of calls to > >>> the > >>>>>> > process handler. That's a good point, having the information at the > >>>>>> task > >>>>>> > level could be beneficial. I updated the KIP to change the metric > >>>>>> level > >>>>>> > and to clarify the wording. > >>>>>> > > >>>>>> >> 5. > >>>>>> >> What do you think about naming the handler > >>> ProcessingExceptionHandler > >>>>>> >> instead of ProcessExceptionHandler? > >>>>>> >> The DeserializationExceptionHanlder and the > >>>>>> ProductionExceptionHandler > >>>>>> >> also use the noun of the action in their name and not the verb. > >>>>>> > > >>>>>> > Good catch, I updated the KIP to rename it > >>> ProcessingExceptionHandler. > >>>>>> > > >>>>>> >> 6. > >>>>>> >> What record is exactly passed to the handler? > >>>>>> >> Is it the input record to the task? Is it the input record to the > >>>>>> >> processor node? Is it the input record to the processor? > >>>>>> > > >>>>>> > The input record of the processor. I assume that is the most user > >>>>>> > friendly record in this context. > >>>>>> > > >>>>>> >> 7. > >>>>>> >> Could you please add the packages of the Java > >>>>>> classes/interfaces/enums > >>>>>> >> you want to add? > >>>>>> > > >>>>>> > Done, without any surprises: package > >>> org.apache.kafka.streams.errors; > >>>>>> > > >>>>>> > > >>>>>> > Thanks a lot for your reviews! Cheers, > >>>>>> > Damien > >>>>>> > This email was screened for spam and malicious content but exercise > >>>>>> caution anyway. > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > On Tue, 9 Apr 2024 at 18:04, Bill Bejeck > >>>>>> <bbej...@gmail.com<mailto:bbej...@gmail.com>> wrote: > >>>>>> > > >>>>>> >> Hi Damien, Sebastien and Loic, > >>>>>> >> > >>>>>> >> Thanks for the KIP, this is a much-needed addition. > >>>>>> >> I like the approach of getting the plumbing in for handling > >>> processor > >>>>>> >> errors, allowing users to implement more complex solutions as > >>> needed. > >>>>>> >> > >>>>>> >> Overall how where the KIP Is now LGTM, modulo outstanding > >>> comments. I > >>>>>> >> think adding the example you included in this thread to the KIP is > >>>>>> a great > >>>>>> >> idea. > >>>>>> >> > >>>>>> >> Regarding the metrics, I'm thinking along the same lines as Bruno. > >>>>>> I'm > >>>>>> >> wondering if we can make do with a task-level metric at the INFO > >>>>>> level and > >>>>>> >> the processor metric at DEBUG. IMHO, when it comes to tracking > >>>>>> exceptions > >>>>>> >> in processing, these two areas are where users will want to focus, > >>>>>> higher > >>>>>> >> level metrics wouldn't be as useful in this case. > >>>>>> >> > >>>>>> >> Thanks, > >>>>>> >> Bill > >>>>>> >> > >>>>>> >> On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna > >>>>>> <cado...@apache.org<mailto:cado...@apache.org>> wrote: > >>>>>> >> > >>>>>> >>> Hi again, > >>>>>> >>> > >>>>>> >>> I have additional questions/comments. > >>>>>> >>> > >>>>>> >>> 6. > >>>>>> >>> What record is exactly passed to the handler? > >>>>>> >>> Is it the input record to the task? Is it the input record to the > >>>>>> >>> processor node? Is it the input record to the processor? > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> 7. > >>>>>> >>> Could you please add the packages of the Java > >>>>>> classes/interfaces/enums > >>>>>> >>> you want to add? > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> Best, > >>>>>> >>> Bruno > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> On 4/9/24 10:17 AM, Bruno Cadonna wrote: > >>>>>> >>>> Hi Loïc, Damien, and Sébastien, > >>>>>> >>>> > >>>>>> >>>> Thanks for the KIP! > >>>>>> >>>> I find it really great that you contribute back to Kafka Streams > >>>>>> >>>> concepts you developed for kstreamplify so that everybody can > >>> take > >>>>>> >>>> advantage from your improvements. > >>>>>> >>>> > >>>>>> >>>> I have a couple of questions/comments: > >>>>>> >>>> > >>>>>> >>>> 1. and 2. > >>>>>> >>>> I am wondering whether we should expose the processor node ID -- > >>>>>> which > >>>>>> >>>> basically is the processor node name -- in the ProcessingContext > >>>>>> >>>> interface. I think the processor node ID fits well in the > >>>>>> >>>> ProcessingContext interface since it already contains > >>>>>> application ID > >>>>>> >> and > >>>>>> >>>> task ID and it would make the API for the handler cleaner. > >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> 3. > >>>>>> >>>> Could you elaborate -- maybe with an example -- when a record is > >>>>>> in a > >>>>>> >>>> state in which it cannot be serialized? This is not completely > >>>>>> clear to > >>>>>> >>> me. > >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> 4. > >>>>>> >>>> Regarding the metrics, it is not entirely clear to me what the > >>>>>> metric > >>>>>> >>>> measures. Is it the number of calls to the process handler or is > >>>>>> it the > >>>>>> >>>> number of calls to process handler that returned FAIL? > >>>>>> >>>> If it is the former, I was also wondering whether it would be > >>>>>> better to > >>>>>> >>>> put the task-level metrics to INFO reporting level and remove > >>> the > >>>>>> >>>> thread-level metric, similar to the dropped-records metric. You > >>> can > >>>>>> >>>> always roll-up the metrics to the thread level in your preferred > >>>>>> >>>> monitoring system. Or do you think we end up with to many > >>> metrics? > >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> 5. > >>>>>> >>>> What do you think about naming the handler > >>>>>> ProcessingExceptionHandler > >>>>>> >>>> instead of ProcessExceptionHandler? > >>>>>> >>>> The DeserializationExceptionHanlder and the > >>>>>> ProductionExceptionHandler > >>>>>> >>>> also use the noun of the action in their name and not the verb. > >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> Best, > >>>>>> >>>> Bruno > >>>>>> >>>> > >>>>>> >>>> > >>>>>> >>>> On 4/8/24 3:48 PM, Sebastien Viale wrote: > >>>>>> >>>>> Thanks for your review! > >>>>>> >>>>> > >>>>>> >>>>> All the points make sense for us! > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> We updated the KIP for points 1 and 4. > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> 2/ We followed the DeserializationExceptionHandler interface > >>>>>> >>>>> signature, it was not on our mind that the record be forwarded > >>>>>> with > >>>>>> >>>>> the ProcessorContext. > >>>>>> >>>>> > >>>>>> >>>>> The ProcessingContext is sufficient, we do expect that most > >>> people > >>>>>> >>>>> would need to access the RecordMetadata. > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> 3/ The use of Record<Object, Object> is required, as the error > >>>>>> could > >>>>>> >>>>> occurred in the middle of a processor where records could be > >>> non > >>>>>> >>>>> serializable objects > >>>>>> >>>>> > >>>>>> >>>>> As it is a global error catching, the user may need little > >>>>>> >>>>> information about the faulty record. > >>>>>> >>>>> > >>>>>> >>>>> Assuming that users want to make some specific treatments to > >>> the > >>>>>> >>>>> record, they can add a try / catch block in the topology. > >>>>>> >>>>> > >>>>>> >>>>> It is up to users to cast record value and key in the > >>>>>> implementation > >>>>>> >>>>> of the ProcessorExceptionHandler. > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>>>> Cheers > >>>>>> >>>>> > >>>>>> >>>>> Loïc, Damien and Sébastien > >>>>>> >>>>> > >>>>>> >>>>> ________________________________ > >>>>>> >>>>> De : Sophie Blee-Goldman > >>>>>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> > >>>>>> >>>>> Envoyé : samedi 6 avril 2024 01:08 > >>>>>> >>>>> À : dev@kafka.apache.org<mailto:dev@kafka.apache.org> > >>>>>> <dev@kafka.apache.org<mailto:dev@kafka.apache.org>> > >>>>>> >>>>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams > >>> exception > >>>>>> >>>>> handler for exceptions occuring during processing > >>>>>> >>>>> > >>>>>> >>>>> Warning External sender Do not click on any links or open any > >>>>>> >>>>> attachments unless you trust the sender and know the content is > >>>>>> safe. > >>>>>> >>>>> > >>>>>> >>>>> Hi Damien, > >>>>>> >>>>> > >>>>>> >>>>> First off thanks for the KIP, this is definitely a much needed > >>>>>> >>>>> feature. On > >>>>>> >>>>> the > >>>>>> >>>>> whole it seems pretty straightforward and I am in favor of the > >>>>>> >> proposal. > >>>>>> >>>>> Just > >>>>>> >>>>> a few questions and suggestions here and there: > >>>>>> >>>>> > >>>>>> >>>>> 1. One of the #handle method's parameters is "ProcessorNode > >>>>>> node", but > >>>>>> >>>>> ProcessorNode is an internal class (and would expose a lot of > >>>>>> >> internals > >>>>>> >>>>> that we probably don't want to pass in to an exception > >>>>>> handler). Would > >>>>>> >>> it > >>>>>> >>>>> be sufficient to just make this a String and pass in the > >>> processor > >>>>>> >> name? > >>>>>> >>>>> > >>>>>> >>>>> 2. Another of the parameters in the ProcessorContext. This > >>> would > >>>>>> >> enable > >>>>>> >>>>> the handler to potentially forward records, which imo should > >>>>>> not be > >>>>>> >> done > >>>>>> >>>>> from the handler since it could only ever call #forward but not > >>>>>> direct > >>>>>> >>>>> where > >>>>>> >>>>> the record is actually forwarded to, and could cause confusion > >>> if > >>>>>> >> users > >>>>>> >>>>> aren't aware that the handler is effectively calling from the > >>>>>> context > >>>>>> >>>>> of the > >>>>>> >>>>> processor that threw the exception. > >>>>>> >>>>> 2a. If you don't explicitly want the ability to forward > >>> records, I > >>>>>> >> would > >>>>>> >>>>> suggest changing the type of this parameter to > >>> ProcessingContext, > >>>>>> >> which > >>>>>> >>>>> has all the metadata and useful info of the ProcessorContext > >>> but > >>>>>> >> without > >>>>>> >>>>> the > >>>>>> >>>>> forwarding APIs. This would also lets us sidestep the following > >>>>>> issue: > >>>>>> >>>>> 2b. If you *do* want the ability to forward records, setting > >>> aside > >>>>>> >>>>> whether > >>>>>> >>>>> that > >>>>>> >>>>> in of itself makes sense to do, we would need to pass in > >>> either a > >>>>>> >>> regular > >>>>>> >>>>> ProcessorContext or a FixedKeyProcessorContext, depending on > >>>>>> what kind > >>>>>> >>>>> of processor it is. I'm not quite sure how we could design a > >>>>>> clean API > >>>>>> >>>>> here, > >>>>>> >>>>> so I'll hold off until you clarify whether you even want > >>>>>> forwarding or > >>>>>> >>>>> not. > >>>>>> >>>>> We would also need to split the input record into a Record vs > >>>>>> >>>>> FixedKeyRecord > >>>>>> >>>>> > >>>>>> >>>>> 3. One notable difference between this handler and the existing > >>>>>> ones > >>>>>> >> you > >>>>>> >>>>> pointed out, the Deserialization/ProductionExceptionHandler, is > >>>>>> that > >>>>>> >> the > >>>>>> >>>>> records passed in to those are in serialized bytes, whereas the > >>>>>> record > >>>>>> >>>>> here would be POJOs. You account for this by making the > >>> parameter > >>>>>> >>>>> type a Record<Object, Object>, but I just wonder how users > >>>>>> would be > >>>>>> >>>>> able to read the key/value and figure out what type it should > >>>>>> be. For > >>>>>> >>>>> example, would they need to maintain a map from processor name > >>> to > >>>>>> >>>>> input record types? > >>>>>> >>>>> > >>>>>> >>>>> If you could provide an example of this new feature in the > >>> KIP, it > >>>>>> >>>>> would be > >>>>>> >>>>> very helpful in understanding whether we could do something to > >>>>>> make it > >>>>>> >>>>> easier for users to use, for if it would be fine as-is > >>>>>> >>>>> > >>>>>> >>>>> 4. We should include all the relevant info for a new metric, > >>>>>> such as > >>>>>> >> the > >>>>>> >>>>> metric > >>>>>> >>>>> group and recording level. You can look at other metrics KIPs > >>> like > >>>>>> >>>>> KIP-444 > >>>>>> >>>>> and KIP-613 for an example. I suspect you intend for this to be > >>>>>> in the > >>>>>> >>>>> processor group and at the INFO level? > >>>>>> >>>>> > >>>>>> >>>>> Hope that all makes sense! Thanks again for the KIP > >>>>>> >>>>> > >>>>>> >>>>> -Sophie > >>>>>> >>>>> > >>>>>> >>>>> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina < > >>>>>> >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com> > >>>>>> >>>> > >>>>>> >>>>> wrote: > >>>>>> >>>>> > >>>>>> >>>>>> Hi everyone, > >>>>>> >>>>>> > >>>>>> >>>>>> After writing quite a few Kafka Streams applications, me and > >>> my > >>>>>> >>>>>> colleagues > >>>>>> >>>>>> just created KIP-1033 to introduce a new Exception Handler in > >>>>>> Kafka > >>>>>> >>>>>> Streams > >>>>>> >>>>>> to simplify error handling. > >>>>>> >>>>>> This feature would allow defining an exception handler to > >>>>>> >> automatically > >>>>>> >>>>>> catch exceptions occurring during the processing of a message. > >>>>>> >>>>>> > >>>>>> >>>>>> KIP link: > >>>>>> >>>>>> > >>>>>> >>>>>> > >>>>>> >>> > >>>>>> >> > >>>>>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>> < > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>>> < > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>> < > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>>>> > >>>>>> >>> < > >>>>>> >>> > >>>>>> >> > >>>>>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>> < > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>>> < > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>> < > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > >>>>> > >>>>>> >>>> > >>>>>> >>>>>> > >>>>>> >>>>>> Feedbacks and suggestions are welcome, > >>>>>> >>>>>> > >>>>>> >>>>>> Cheers, > >>>>>> >>>>>> Damien, Sebastien and Loic > >>>>>> >>>>>> > >>>>>> >>>>> > >>>>>> >>>>> This email was screened for spam and malicious content but > >>>>>> exercise > >>>>>> >>>>> caution anyway. > >>>>>> >>>>> > >>>>>> >>>>> > >>>>>> >>> > >>>>>> >> > >>>