Hello all, I've updated the KIP based on this conversation, and made it so that its interface, config setting, and parameters line up more closely with the interface in KIP-161 (deserialization handler).
I believe there are a few specific questions I need to reply to..... > The question I had about then handle parameters are around the record, > should it be `ProducerRecord<byte[], byte[]>`, or be generics of > `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? extends > Object, ? extends Object>`? At this point in the code we're guaranteed that this is a ProducerRecord<byte[], byte[]>, so the generics would just make it harder to work with the key and value. > Also, should the handle function include the `RecordMetadata` as well in > case it is not null? Please correct me if I'm wrong, but my understanding is that the record metadata is always null if an exception occurred while trying to produce. > We may probably try to write down at least the following handling logic and > see if the given API is sufficient for it I've added some examples to the KIP. Let me know what you think. Cheers, Matt On Mon, Oct 23, 2017 at 9:00 PM Matt Farmer <m...@frmr.me> wrote: > Thanks for this feedback. I’m at a conference right now and am planning on > updating the KIP again with details from this conversation later this week. > > I’ll shoot you a more detailed response then! :) > On Mon, Oct 23, 2017 at 8:16 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Thanks for the KIP Matt. >> >> Regarding the handle interface of ProductionExceptionHandlerResponse, >> could >> you write it on the wiki also, along with the actual added config names >> (e.g. what >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers >> described). >> >> The question I had about then handle parameters are around the record, >> should it be `ProducerRecord<byte[], byte[]>`, or be generics of >> `ProducerRecord<? extends K, ? extends V>` or `ProducerRecord<? extends >> Object, ? extends Object>`? >> >> Also, should the handle function include the `RecordMetadata` as well in >> case it is not null? >> >> We may probably try to write down at least the following handling logic >> and >> see if the given API is sufficient for it: 1) throw exception immediately >> to fail fast and stop the world, 2) log the error and drop record and >> proceed silently, 3) send such errors to a specific "error" Kafka topic, >> or >> record it as an app-level metrics ( >> https://kafka.apache.org/documentation/#kafka_streams_monitoring) for >> monitoring. >> >> Guozhang >> >> >> >> On Fri, Oct 20, 2017 at 5:47 PM, Matt Farmer <m...@frmr.me> wrote: >> >> > I did some more digging tonight. >> > >> > @Ted: It looks like the deserialization handler uses >> > "default.deserialization.exception.handler" for the config name. No >> > ".class" on the end. I'm inclined to think this should use >> > "default.production.exception.handler". >> > >> > On Fri, Oct 20, 2017 at 8:22 PM Matt Farmer <m...@frmr.me> wrote: >> > >> > > Okay, I've dug into this a little bit. >> > > >> > > I think getting access to the serialized record is possible, and >> changing >> > > the naming and return type is certainly doable. However, because we're >> > > hooking into the onCompletion callback we have no guarantee that the >> > > ProcessorContext state hasn't changed by the time this particular >> handler >> > > runs. So I think the signature would change to something like: >> > > >> > > ProductionExceptionHandlerResponse handle(final ProducerRecord<..> >> > record, >> > > final Exception exception) >> > > >> > > Would this be acceptable? >> > > >> > > On Thu, Oct 19, 2017 at 7:33 PM Matt Farmer <m...@frmr.me> wrote: >> > > >> > >> Ah good idea. Hmmm. I can line up the naming and return type but I’m >> not >> > >> sure if I can get my hands on the context and the record itself >> without >> > >> other changes. >> > >> >> > >> Let me dig in and follow up here tomorrow. >> > >> On Thu, Oct 19, 2017 at 7:14 PM Matthias J. Sax < >> matth...@confluent.io> >> > >> wrote: >> > >> >> > >>> Thanks for the KIP. >> > >>> >> > >>> Are you familiar with KIP-161? >> > >>> >> > >>> >> > >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >> > deserialization+exception+handlers >> > >>> >> > >>> I thinks, we should align the design (parameter naming, return >> types, >> > >>> class names etc) of KIP-210 to KIP-161 to get a unified user >> > experience. >> > >>> >> > >>> >> > >>> >> > >>> -Matthias >> > >>> >> > >>> >> > >>> On 10/18/17 4:20 PM, Matt Farmer wrote: >> > >>> > I’ll create the JIRA ticket. >> > >>> > >> > >>> > I think that config name will work. I’ll update the KIP >> accordingly. >> > >>> > On Wed, Oct 18, 2017 at 6:09 PM Ted Yu <yuzhih...@gmail.com> >> wrote: >> > >>> > >> > >>> >> Can you create JIRA that corresponds to the KIP ? >> > >>> >> >> > >>> >> For the new config, how about naming it >> > >>> >> production.exception.processor.class >> > >>> >> ? This way it is clear that class name should be specified. >> > >>> >> >> > >>> >> Cheers >> > >>> >> >> > >>> >> On Wed, Oct 18, 2017 at 2:40 PM, Matt Farmer <m...@frmr.me> >> wrote: >> > >>> >> >> > >>> >>> Hello everyone, >> > >>> >>> >> > >>> >>> This is the discussion thread for the KIP that I just filed >> here: >> > >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > >>> >>> 210+-+Provide+for+custom+error+handling++when+Kafka+ >> > >>> >>> Streams+fails+to+produce >> > >>> >>> >> > >>> >>> Looking forward to getting some feedback from folks about this >> idea >> > >>> and >> > >>> >>> working toward a solution we can contribute back. :) >> > >>> >>> >> > >>> >>> Cheers, >> > >>> >>> Matt Farmer >> > >>> >>> >> > >>> >> >> > >>> > >> > >>> >> > >>> >> > >> >> >> >> -- >> -- Guozhang >> >