Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks for the feedback, I think we should keep two separate callbacks for serialization and error handlers. It makes sense for type safety (ProducerRecord vs POJO) and also for backward compatibility. On top of that, all metadata provided in the #handle method would need to be held in memory until the producer invokes its callback, in the future, having two callbacks might avoid confusion as some metadata might be provided to #handle and not #handleSerializationException. I do think that one method would be cleaner but for backward compatibility, type safety and memory reasons, we should keep two separate callbacks. As you suggested Sophie, I updated the KIP to introduce a SerializationExceptionOrigin enum and added the "origin" parameter to the #handleSerializationException method. On Sat, 11 May 2024 at 07:30, Sophie Blee-Goldman wrote: > > Whoops, just noticed there is already a voting thread for this. Hard to > keep track with all the KIPs going on right now! > > In that case I'll just wait for the SerializationExceptionOrigin thing to > be added and then I'll vote. Should definitely be able to make 3.8 in this > case :D > > On Fri, May 10, 2024 at 10:10 PM Sophie Blee-Goldman > wrote: > > > Sounds like we're more or less in agreement here. I think the KIP just > > needs one small update still, which is the SerializationExceptionOrigin > > enum. > > > > As discussed there are a few options for this and we're all happy to defer > > to the preference of the KIP authors, but if we keep the KIP as-is with the > > two separate methods in the ProductionExceptionHandler, then imo it makes > > the most sense to add the SerializationExceptionOrigin enum to the > > ProductionExceptionHandler interface and then add an "origin" parameter to > > the new #handleSerializationException method. However you decide to do it, > > I'm personally happy to vote on this KIP once the KIP is updated. > > > > Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys > > would like to target 3.8 for this feature, just make sure to update the KIP > > and kick off a [VOTE] thread by EOD Monday so that you can close the vote > > by EOD Wednesday (since it has to be open for 72 hours). > > > > Thanks again for this sorely needed feature! > > > > On Fri, May 10, 2024 at 10:06 AM Bill Bejeck wrote: > > > >> Great KIP discussion so far by everyone. > >> At this point, I'm in agreement with the direction and current state of > >> the > >> KIP. > >> > >> As for having two separate callbacks for the ProductionExceptionHandler, > >> I'm somewhat split in that I agree with points raised by Sophie and > >> Matthias with my final > >> position being to maintain both callbacks. IMHO, while there are several > >> things that could go wrong with producing a message, it seems that > >> serialization exceptions would be the most common, although I don't have > >> any data to back that opinion up. But having said that, should the KIP > >> authors decide otherwise, I would be fine with that approach as well. > >> > >> I'm at the point where I'm comfortable voting for this KIP. > >> > >> Thanks, > >> Bill > >> > >> On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman >> > > >> wrote: > >> > >> > The type safety issue is definitely not solved by having two separate > >> > callbacks. I just think it gets a bit worse by mashing them into one > >> > method. At least in the plain #handle method you can be sure that the > >> type > >> > is ProducerRecord and in #handleSerialization the type > >> is > >> > some POJO. > >> > > >> > And in theory you can just embed the mapping of sink topics to > >> type/Serde > >> > based on your topology. Or let's say your output record keys & values > >> are > >> > all Strings, and you want to print the String representation in your > >> > handler, rather than the bytes. > >> > Having a separate callback means knowing you can simply print the > >> > ProducerRecord's key/value in the #handleSerialization method, and will > >> > have to use a StringDeserializer to convert the key/value to its String > >> > form to print it in the #handle method. > >> > > >> > Again, I just feel this will be more straightforward and easy for users > >> to > >> > use correctly, but am satisfied either way. I'll shut up now and wait > >> for > >> > the KIP authors to make a call on this one way or another, and then I'm > >> > happy to cast my vote > >> > > >> > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax > >> wrote: > >> > > >> > > Thanks Sophie! Makes it much clearer where you are coming from. > >> > > > >> > > About the Type unsafety: isn't this also an issue for the > >> > > `handleSerialziationException` case, because the handler is used for > >> all > >> > > sink topics, and thus key/value types are not really know w/o taking > >> the > >> > > sink topic into account? -- So I am not sure if having two handler > >> > > methods really helps much with regard to type safety? > >> > > > >> > > Just want to make this
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Whoops, just noticed there is already a voting thread for this. Hard to keep track with all the KIPs going on right now! In that case I'll just wait for the SerializationExceptionOrigin thing to be added and then I'll vote. Should definitely be able to make 3.8 in this case :D On Fri, May 10, 2024 at 10:10 PM Sophie Blee-Goldman wrote: > Sounds like we're more or less in agreement here. I think the KIP just > needs one small update still, which is the SerializationExceptionOrigin > enum. > > As discussed there are a few options for this and we're all happy to defer > to the preference of the KIP authors, but if we keep the KIP as-is with the > two separate methods in the ProductionExceptionHandler, then imo it makes > the most sense to add the SerializationExceptionOrigin enum to the > ProductionExceptionHandler interface and then add an "origin" parameter to > the new #handleSerializationException method. However you decide to do it, > I'm personally happy to vote on this KIP once the KIP is updated. > > Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys > would like to target 3.8 for this feature, just make sure to update the KIP > and kick off a [VOTE] thread by EOD Monday so that you can close the vote > by EOD Wednesday (since it has to be open for 72 hours). > > Thanks again for this sorely needed feature! > > On Fri, May 10, 2024 at 10:06 AM Bill Bejeck wrote: > >> Great KIP discussion so far by everyone. >> At this point, I'm in agreement with the direction and current state of >> the >> KIP. >> >> As for having two separate callbacks for the ProductionExceptionHandler, >> I'm somewhat split in that I agree with points raised by Sophie and >> Matthias with my final >> position being to maintain both callbacks. IMHO, while there are several >> things that could go wrong with producing a message, it seems that >> serialization exceptions would be the most common, although I don't have >> any data to back that opinion up. But having said that, should the KIP >> authors decide otherwise, I would be fine with that approach as well. >> >> I'm at the point where I'm comfortable voting for this KIP. >> >> Thanks, >> Bill >> >> On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman > > >> wrote: >> >> > The type safety issue is definitely not solved by having two separate >> > callbacks. I just think it gets a bit worse by mashing them into one >> > method. At least in the plain #handle method you can be sure that the >> type >> > is ProducerRecord and in #handleSerialization the type >> is >> > some POJO. >> > >> > And in theory you can just embed the mapping of sink topics to >> type/Serde >> > based on your topology. Or let's say your output record keys & values >> are >> > all Strings, and you want to print the String representation in your >> > handler, rather than the bytes. >> > Having a separate callback means knowing you can simply print the >> > ProducerRecord's key/value in the #handleSerialization method, and will >> > have to use a StringDeserializer to convert the key/value to its String >> > form to print it in the #handle method. >> > >> > Again, I just feel this will be more straightforward and easy for users >> to >> > use correctly, but am satisfied either way. I'll shut up now and wait >> for >> > the KIP authors to make a call on this one way or another, and then I'm >> > happy to cast my vote >> > >> > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax >> wrote: >> > >> > > Thanks Sophie! Makes it much clearer where you are coming from. >> > > >> > > About the Type unsafety: isn't this also an issue for the >> > > `handleSerialziationException` case, because the handler is used for >> all >> > > sink topics, and thus key/value types are not really know w/o taking >> the >> > > sink topic into account? -- So I am not sure if having two handler >> > > methods really helps much with regard to type safety? >> > > >> > > Just want to make this small comment for completeness. Let's hear what >> > > others think. Given that we both don't have a strong opinion but just >> a >> > > personal preference, we should be able to come to a conclusion quickly >> > > and get this KIP approved for 3.8 :) >> > > >> > > >> > > -Matthias >> > > >> > > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote: >> > > > Well I definitely don't feel super strongly about it, and more >> > > importantly, >> > > > I'm not a user. So I will happily defer to the preference of anyone >> who >> > > > will actually be using this feature. But I'll explain my reasoning: >> > > > >> > > > There *is* a relevant distinction between these two callbacks -- >> > because >> > > > the passed-in record will have a different type depending on >> whether it >> > > was >> > > > a serialization exception or something else. Even if we combined >> them >> > > into >> > > > a single #handle method, users will still end up implementing two >> > > distinct >> > > > branches depending on whether it was a serialization
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Sounds like we're more or less in agreement here. I think the KIP just needs one small update still, which is the SerializationExceptionOrigin enum. As discussed there are a few options for this and we're all happy to defer to the preference of the KIP authors, but if we keep the KIP as-is with the two separate methods in the ProductionExceptionHandler, then imo it makes the most sense to add the SerializationExceptionOrigin enum to the ProductionExceptionHandler interface and then add an "origin" parameter to the new #handleSerializationException method. However you decide to do it, I'm personally happy to vote on this KIP once the KIP is updated. Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys would like to target 3.8 for this feature, just make sure to update the KIP and kick off a [VOTE] thread by EOD Monday so that you can close the vote by EOD Wednesday (since it has to be open for 72 hours). Thanks again for this sorely needed feature! On Fri, May 10, 2024 at 10:06 AM Bill Bejeck wrote: > Great KIP discussion so far by everyone. > At this point, I'm in agreement with the direction and current state of the > KIP. > > As for having two separate callbacks for the ProductionExceptionHandler, > I'm somewhat split in that I agree with points raised by Sophie and > Matthias with my final > position being to maintain both callbacks. IMHO, while there are several > things that could go wrong with producing a message, it seems that > serialization exceptions would be the most common, although I don't have > any data to back that opinion up. But having said that, should the KIP > authors decide otherwise, I would be fine with that approach as well. > > I'm at the point where I'm comfortable voting for this KIP. > > Thanks, > Bill > > On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman > wrote: > > > The type safety issue is definitely not solved by having two separate > > callbacks. I just think it gets a bit worse by mashing them into one > > method. At least in the plain #handle method you can be sure that the > type > > is ProducerRecord and in #handleSerialization the type is > > some POJO. > > > > And in theory you can just embed the mapping of sink topics to type/Serde > > based on your topology. Or let's say your output record keys & values are > > all Strings, and you want to print the String representation in your > > handler, rather than the bytes. > > Having a separate callback means knowing you can simply print the > > ProducerRecord's key/value in the #handleSerialization method, and will > > have to use a StringDeserializer to convert the key/value to its String > > form to print it in the #handle method. > > > > Again, I just feel this will be more straightforward and easy for users > to > > use correctly, but am satisfied either way. I'll shut up now and wait for > > the KIP authors to make a call on this one way or another, and then I'm > > happy to cast my vote > > > > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax > wrote: > > > > > Thanks Sophie! Makes it much clearer where you are coming from. > > > > > > About the Type unsafety: isn't this also an issue for the > > > `handleSerialziationException` case, because the handler is used for > all > > > sink topics, and thus key/value types are not really know w/o taking > the > > > sink topic into account? -- So I am not sure if having two handler > > > methods really helps much with regard to type safety? > > > > > > Just want to make this small comment for completeness. Let's hear what > > > others think. Given that we both don't have a strong opinion but just a > > > personal preference, we should be able to come to a conclusion quickly > > > and get this KIP approved for 3.8 :) > > > > > > > > > -Matthias > > > > > > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote: > > > > Well I definitely don't feel super strongly about it, and more > > > importantly, > > > > I'm not a user. So I will happily defer to the preference of anyone > who > > > > will actually be using this feature. But I'll explain my reasoning: > > > > > > > > There *is* a relevant distinction between these two callbacks -- > > because > > > > the passed-in record will have a different type depending on whether > it > > > was > > > > a serialization exception or something else. Even if we combined them > > > into > > > > a single #handle method, users will still end up implementing two > > > distinct > > > > branches depending on whether it was a serialization exception or > not, > > > > since that determines the type of the ProducerRecord passed in. > > > > > > > > Not to mention they'll need to cast it to a ProducerRecord > > byte[]> > > > > when we could have just passed it in as this type via a dedicated > > > callback. > > > > And note that because of the generics, they can't do an instanceof > > check > > > to > > > > make sure that the record type is ProducerRecord and > > will > > > > have to suppress the "unchecked cast" warning. > >
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Great KIP discussion so far by everyone. At this point, I'm in agreement with the direction and current state of the KIP. As for having two separate callbacks for the ProductionExceptionHandler, I'm somewhat split in that I agree with points raised by Sophie and Matthias with my final position being to maintain both callbacks. IMHO, while there are several things that could go wrong with producing a message, it seems that serialization exceptions would be the most common, although I don't have any data to back that opinion up. But having said that, should the KIP authors decide otherwise, I would be fine with that approach as well. I'm at the point where I'm comfortable voting for this KIP. Thanks, Bill On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman wrote: > The type safety issue is definitely not solved by having two separate > callbacks. I just think it gets a bit worse by mashing them into one > method. At least in the plain #handle method you can be sure that the type > is ProducerRecord and in #handleSerialization the type is > some POJO. > > And in theory you can just embed the mapping of sink topics to type/Serde > based on your topology. Or let's say your output record keys & values are > all Strings, and you want to print the String representation in your > handler, rather than the bytes. > Having a separate callback means knowing you can simply print the > ProducerRecord's key/value in the #handleSerialization method, and will > have to use a StringDeserializer to convert the key/value to its String > form to print it in the #handle method. > > Again, I just feel this will be more straightforward and easy for users to > use correctly, but am satisfied either way. I'll shut up now and wait for > the KIP authors to make a call on this one way or another, and then I'm > happy to cast my vote > > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax wrote: > > > Thanks Sophie! Makes it much clearer where you are coming from. > > > > About the Type unsafety: isn't this also an issue for the > > `handleSerialziationException` case, because the handler is used for all > > sink topics, and thus key/value types are not really know w/o taking the > > sink topic into account? -- So I am not sure if having two handler > > methods really helps much with regard to type safety? > > > > Just want to make this small comment for completeness. Let's hear what > > others think. Given that we both don't have a strong opinion but just a > > personal preference, we should be able to come to a conclusion quickly > > and get this KIP approved for 3.8 :) > > > > > > -Matthias > > > > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote: > > > Well I definitely don't feel super strongly about it, and more > > importantly, > > > I'm not a user. So I will happily defer to the preference of anyone who > > > will actually be using this feature. But I'll explain my reasoning: > > > > > > There *is* a relevant distinction between these two callbacks -- > because > > > the passed-in record will have a different type depending on whether it > > was > > > a serialization exception or something else. Even if we combined them > > into > > > a single #handle method, users will still end up implementing two > > distinct > > > branches depending on whether it was a serialization exception or not, > > > since that determines the type of the ProducerRecord passed in. > > > > > > Not to mention they'll need to cast it to a ProducerRecord > byte[]> > > > when we could have just passed it in as this type via a dedicated > > callback. > > > And note that because of the generics, they can't do an instanceof > check > > to > > > make sure that the record type is ProducerRecord and > will > > > have to suppress the "unchecked cast" warning. > > > > > > So if we combined the two callbacks, their handler will look something > > like > > > this: > > > > > > @SuppressWarnings("unchecked") > > > public ProductionExceptionHandlerResponse handle(final > > ErrorHandlerContext > > > context, > > > final ProducerRecord record, > > > final Exception exception) { > > > if (exception instanceof SerializationException) { > > > if (exception.origin().equals(KEY)) { > > > log.error("Failed to serialize key", exception); > > > } else { > > > log.error("Failed to serialize value", exception); > > > } > > > > > > } else { > > > final ProducerRecord serializedRecord = > > (ProducerRecord > > byte[]>) record; > > > log.error("Failed to produce record with serialized key={} and > serialized > > > value={}", > > > serializedRecord.key(), serializedRecord.value()); > > > } > > > return ProductionExceptionHandlerResponse.FAIL; > > > } > > > > > > That seems like the most basic case, and it still haswith distinct > logic > > > even if they ultimately handle exceptions the same way. And looking > > forward > > > to KIP-1034: Dead-letter queues, it seems all the more likely that the > > > actual handling response might be different depending on whether it's a > > >
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
The type safety issue is definitely not solved by having two separate callbacks. I just think it gets a bit worse by mashing them into one method. At least in the plain #handle method you can be sure that the type is ProducerRecord and in #handleSerialization the type is some POJO. And in theory you can just embed the mapping of sink topics to type/Serde based on your topology. Or let's say your output record keys & values are all Strings, and you want to print the String representation in your handler, rather than the bytes. Having a separate callback means knowing you can simply print the ProducerRecord's key/value in the #handleSerialization method, and will have to use a StringDeserializer to convert the key/value to its String form to print it in the #handle method. Again, I just feel this will be more straightforward and easy for users to use correctly, but am satisfied either way. I'll shut up now and wait for the KIP authors to make a call on this one way or another, and then I'm happy to cast my vote On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax wrote: > Thanks Sophie! Makes it much clearer where you are coming from. > > About the Type unsafety: isn't this also an issue for the > `handleSerialziationException` case, because the handler is used for all > sink topics, and thus key/value types are not really know w/o taking the > sink topic into account? -- So I am not sure if having two handler > methods really helps much with regard to type safety? > > Just want to make this small comment for completeness. Let's hear what > others think. Given that we both don't have a strong opinion but just a > personal preference, we should be able to come to a conclusion quickly > and get this KIP approved for 3.8 :) > > > -Matthias > > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote: > > Well I definitely don't feel super strongly about it, and more > importantly, > > I'm not a user. So I will happily defer to the preference of anyone who > > will actually be using this feature. But I'll explain my reasoning: > > > > There *is* a relevant distinction between these two callbacks -- because > > the passed-in record will have a different type depending on whether it > was > > a serialization exception or something else. Even if we combined them > into > > a single #handle method, users will still end up implementing two > distinct > > branches depending on whether it was a serialization exception or not, > > since that determines the type of the ProducerRecord passed in. > > > > Not to mention they'll need to cast it to a ProducerRecord byte[]> > > when we could have just passed it in as this type via a dedicated > callback. > > And note that because of the generics, they can't do an instanceof check > to > > make sure that the record type is ProducerRecord and will > > have to suppress the "unchecked cast" warning. > > > > So if we combined the two callbacks, their handler will look something > like > > this: > > > > @SuppressWarnings("unchecked") > > public ProductionExceptionHandlerResponse handle(final > ErrorHandlerContext > > context, > > final ProducerRecord record, > > final Exception exception) { > > if (exception instanceof SerializationException) { > > if (exception.origin().equals(KEY)) { > > log.error("Failed to serialize key", exception); > > } else { > > log.error("Failed to serialize value", exception); > > } > > > > } else { > > final ProducerRecord serializedRecord = > (ProducerRecord > byte[]>) record; > > log.error("Failed to produce record with serialized key={} and serialized > > value={}", > > serializedRecord.key(), serializedRecord.value()); > > } > > return ProductionExceptionHandlerResponse.FAIL; > > } > > > > That seems like the most basic case, and it still haswith distinct logic > > even if they ultimately handle exceptions the same way. And looking > forward > > to KIP-1034: Dead-letter queues, it seems all the more likely that the > > actual handling response might be different depending on whether it's a > > serialization exception or not: a serialized record can probably be > > retried/sent to a DLQ, whereas a record that can't be serialized should > not > > (can't, really) be forwarded to a DLQ. So if they're going to have > > completely different implementations depending on whether it's a > > serialization exception, why not just give them two separate callbacks? > > > > And that's all assuming the user is perfectly aware of the different > > exception types and their implications for the type of the > ProducerRecord. > > Many people might just miss the existence of the > > RecordSerializationException altogether -- > > there are already so many different exception types, ESPECIALLY when it > > comes to the Producer. Not to mention they'll need to understand the > > nuances of how the ProducerRecord type changes depending on the type of > > exception that's passed in. And on top of all that, they'll need to know > > that there is metadata stored in the
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks Sophie! Makes it much clearer where you are coming from. About the Type unsafety: isn't this also an issue for the `handleSerialziationException` case, because the handler is used for all sink topics, and thus key/value types are not really know w/o taking the sink topic into account? -- So I am not sure if having two handler methods really helps much with regard to type safety? Just want to make this small comment for completeness. Let's hear what others think. Given that we both don't have a strong opinion but just a personal preference, we should be able to come to a conclusion quickly and get this KIP approved for 3.8 :) -Matthias On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote: Well I definitely don't feel super strongly about it, and more importantly, I'm not a user. So I will happily defer to the preference of anyone who will actually be using this feature. But I'll explain my reasoning: There *is* a relevant distinction between these two callbacks -- because the passed-in record will have a different type depending on whether it was a serialization exception or something else. Even if we combined them into a single #handle method, users will still end up implementing two distinct branches depending on whether it was a serialization exception or not, since that determines the type of the ProducerRecord passed in. Not to mention they'll need to cast it to a ProducerRecord when we could have just passed it in as this type via a dedicated callback. And note that because of the generics, they can't do an instanceof check to make sure that the record type is ProducerRecord and will have to suppress the "unchecked cast" warning. So if we combined the two callbacks, their handler will look something like this: @SuppressWarnings("unchecked") public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { if (exception instanceof SerializationException) { if (exception.origin().equals(KEY)) { log.error("Failed to serialize key", exception); } else { log.error("Failed to serialize value", exception); } } else { final ProducerRecord serializedRecord = (ProducerRecord) record; log.error("Failed to produce record with serialized key={} and serialized value={}", serializedRecord.key(), serializedRecord.value()); } return ProductionExceptionHandlerResponse.FAIL; } That seems like the most basic case, and it still haswith distinct logic even if they ultimately handle exceptions the same way. And looking forward to KIP-1034: Dead-letter queues, it seems all the more likely that the actual handling response might be different depending on whether it's a serialization exception or not: a serialized record can probably be retried/sent to a DLQ, whereas a record that can't be serialized should not (can't, really) be forwarded to a DLQ. So if they're going to have completely different implementations depending on whether it's a serialization exception, why not just give them two separate callbacks? And that's all assuming the user is perfectly aware of the different exception types and their implications for the type of the ProducerRecord. Many people might just miss the existence of the RecordSerializationException altogether -- there are already so many different exception types, ESPECIALLY when it comes to the Producer. Not to mention they'll need to understand the nuances of how the ProducerRecord type changes depending on the type of exception that's passed in. And on top of all that, they'll need to know that there is metadata stored in the RecordSerializationException regarding the origin of the error. Whereas if we just passed in the SerializationExceptionOrigin to a #handlerSerialization callback, well, that's pretty impossible to miss. That all just seems like a lot for most people to have to understand to implement a ProductionExceptionHandler, which imo is not at all an advanced feature and should be as straightforward and easy to use as possible. Lastly -- I don't think it's quite fair to compare this to the RecordDeserializationException. We have a dedicated handler that's just for deserialization exceptions specifically, hence there's no worry about users having to be aware of the different exception types they might have to deal with in the DeserializtionExceptionHandler. Whereas serialization exceptions are just a subset of what might get passed in to the ProductionExceptionHandler... Just explaining my reasoning -- in the end I leave it up to the KIP authors and anyone who will actually be using this feature in their applications :) On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax wrote: @Loic, yes, what you describe is exactly what I had in mind. @Sophie, can you elaborate a little bit? First of all, I agree that it makes sense to maintain the two separate callbacks for the ProductionExceptionHandler, since one of them is specifically for serialization exceptions while the other is
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Well I definitely don't feel super strongly about it, and more importantly, I'm not a user. So I will happily defer to the preference of anyone who will actually be using this feature. But I'll explain my reasoning: There *is* a relevant distinction between these two callbacks -- because the passed-in record will have a different type depending on whether it was a serialization exception or something else. Even if we combined them into a single #handle method, users will still end up implementing two distinct branches depending on whether it was a serialization exception or not, since that determines the type of the ProducerRecord passed in. Not to mention they'll need to cast it to a ProducerRecord when we could have just passed it in as this type via a dedicated callback. And note that because of the generics, they can't do an instanceof check to make sure that the record type is ProducerRecord and will have to suppress the "unchecked cast" warning. So if we combined the two callbacks, their handler will look something like this: @SuppressWarnings("unchecked") public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { if (exception instanceof SerializationException) { if (exception.origin().equals(KEY)) { log.error("Failed to serialize key", exception); } else { log.error("Failed to serialize value", exception); } } else { final ProducerRecord serializedRecord = (ProducerRecord) record; log.error("Failed to produce record with serialized key={} and serialized value={}", serializedRecord.key(), serializedRecord.value()); } return ProductionExceptionHandlerResponse.FAIL; } That seems like the most basic case, and it still haswith distinct logic even if they ultimately handle exceptions the same way. And looking forward to KIP-1034: Dead-letter queues, it seems all the more likely that the actual handling response might be different depending on whether it's a serialization exception or not: a serialized record can probably be retried/sent to a DLQ, whereas a record that can't be serialized should not (can't, really) be forwarded to a DLQ. So if they're going to have completely different implementations depending on whether it's a serialization exception, why not just give them two separate callbacks? And that's all assuming the user is perfectly aware of the different exception types and their implications for the type of the ProducerRecord. Many people might just miss the existence of the RecordSerializationException altogether -- there are already so many different exception types, ESPECIALLY when it comes to the Producer. Not to mention they'll need to understand the nuances of how the ProducerRecord type changes depending on the type of exception that's passed in. And on top of all that, they'll need to know that there is metadata stored in the RecordSerializationException regarding the origin of the error. Whereas if we just passed in the SerializationExceptionOrigin to a #handlerSerialization callback, well, that's pretty impossible to miss. That all just seems like a lot for most people to have to understand to implement a ProductionExceptionHandler, which imo is not at all an advanced feature and should be as straightforward and easy to use as possible. Lastly -- I don't think it's quite fair to compare this to the RecordDeserializationException. We have a dedicated handler that's just for deserialization exceptions specifically, hence there's no worry about users having to be aware of the different exception types they might have to deal with in the DeserializtionExceptionHandler. Whereas serialization exceptions are just a subset of what might get passed in to the ProductionExceptionHandler... Just explaining my reasoning -- in the end I leave it up to the KIP authors and anyone who will actually be using this feature in their applications :) On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax wrote: > @Loic, yes, what you describe is exactly what I had in mind. > > > > @Sophie, can you elaborate a little bit? > > > First of all, I agree that it makes sense to maintain the two separate > > callbacks for the ProductionExceptionHandler, since one of them is > > specifically for serialization exceptions while the other is used for > > everything/anything else. > > What makes a serialization exception special compare to other errors > that it's valuable to treat it differently? Why can we put "everything > else" into a single bucket? By your train of though, should we not split > out the "everything else" bucket into a different callback method for > every different error? If no, why not, but only for serialization errors? > > From what I believe to remember, historically, we added the > ProductionExceptionHandler, and kinda just missed the serialization > error case. And later, when we extended the handler we just could not > re-use the existing callback as it was typed with `` and > it would have been an
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
@Loic, yes, what you describe is exactly what I had in mind. @Sophie, can you elaborate a little bit? First of all, I agree that it makes sense to maintain the two separate callbacks for the ProductionExceptionHandler, since one of them is specifically for serialization exceptions while the other is used for everything/anything else. What makes a serialization exception special compare to other errors that it's valuable to treat it differently? Why can we put "everything else" into a single bucket? By your train of though, should we not split out the "everything else" bucket into a different callback method for every different error? If no, why not, but only for serialization errors? From what I believe to remember, historically, we added the ProductionExceptionHandler, and kinda just missed the serialization error case. And later, when we extended the handler we just could not re-use the existing callback as it was typed with `` and it would have been an incompatible change; so it was rather a workaround to add the second method to then handler, but not really intended design? It's of course only my personal opinion that I believe a single callback method is simpler/cleaner compared to sticking with two, and adding the new exception type to make it backward compatible seems worth it. It also kinda introduces the same patter we use elsewhere (cf KIP-1036) what I actually think is an argument for introducing `RercordSerializationExcetpion`, to unify user experience across the board. Would be great to hear from others about this point. It's not that I strongly object to having two methods, and I would not block this KIP on this question. -Matthias On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote: First of all, I agree that it makes sense to maintain the two separate callbacks for the ProductionExceptionHandler, since one of them is specifically for serialization exceptions while the other is used for everything/anything else. I also think we can take advantage of this fact to simplify things a bit and cut down on the amount of new stuff added to the API by just adding a parameter to the #handleSerializationException callback and use that to pass in the SerializationExceptionOrigin enum to distinguish between key vs value. This way we wouldn't need to introduce yet another exception type (the RecordSerializationException) just to pass in this information. Thoughts? On Tue, May 7, 2024 at 8:33 AM Loic Greffier wrote: Hi Matthias, To sum up with the ProductionExceptionHandler callback methods (106) proposed changes. A new method ProductionExceptionHandler#handle is added with the following signature: ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception); The ProducerRecord parameter has changed to accept a serialized or non-serialized record. Thus, the new ProductionExceptionHandler#handle method can handle both production exception and serialization exception. Both old ProductionExceptionHandler#handle and ProductionExceptionHandler#handleSerializationException methods are now deprecated. The old ProductionExceptionHandler#handle method gets a default implementation, so users do not have to implement a deprecated method. To handle backward compatibility, the new ProductionExceptionHandler#handle method gets a default implementation. default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { if (exception instanceof RecordSerializationException) { this.handleSerializationException(record, exception.getCause()); } return handle((ProducerRecord) record, exception); } The default implementation either invokes #handleSerializationException or #handle depending on the type of the exception, thus users still relying on deprecated ProductionExceptionHandler#handle or ProductionExceptionHandler#handleSerializationException custom implementations won't break. The new ProductionExceptionHandler#handle method is now invoked in case of serialization exception: public void send(final String topic, final K key, final V value, ...) { try { keyBytes = keySerializer.serialize(topic, headers, key); ... } catch (final ClassCastException exception) { ... } catch (final Exception exception) { try { response = productionExceptionHandler.handle(context, record, new RecordSerializationException(SerializationExceptionOrigin.KEY, exception)); } catch (final Exception e) { ... } } } To wrap the origin serialization exception and determine whether it comes from the key or the value, a new exception class is created: public class RecordSerializationException extends SerializationException { public enum SerializationExceptionOrigin { KEY, VALUE } public
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
First of all, I agree that it makes sense to maintain the two separate callbacks for the ProductionExceptionHandler, since one of them is specifically for serialization exceptions while the other is used for everything/anything else. I also think we can take advantage of this fact to simplify things a bit and cut down on the amount of new stuff added to the API by just adding a parameter to the #handleSerializationException callback and use that to pass in the SerializationExceptionOrigin enum to distinguish between key vs value. This way we wouldn't need to introduce yet another exception type (the RecordSerializationException) just to pass in this information. Thoughts? On Tue, May 7, 2024 at 8:33 AM Loic Greffier wrote: > Hi Matthias, > > To sum up with the ProductionExceptionHandler callback methods (106) > proposed changes. > > A new method ProductionExceptionHandler#handle is added with the following > signature: > > > ProductionExceptionHandlerResponse handle(final ErrorHandlerContext > context, final ProducerRecord record, final Exception exception); > > The ProducerRecord parameter has changed to accept a serialized or > non-serialized record. > Thus, the new ProductionExceptionHandler#handle method can handle both > production exception and serialization exception. > > Both old ProductionExceptionHandler#handle and > ProductionExceptionHandler#handleSerializationException methods are now > deprecated. > The old ProductionExceptionHandler#handle method gets a default > implementation, so users do not have to implement a deprecated method. > > To handle backward compatibility, the new > ProductionExceptionHandler#handle method gets a default implementation. > > > default ProductionExceptionHandlerResponse handle(final > ErrorHandlerContext context, final ProducerRecord record, final > Exception exception) { > > if (exception instanceof RecordSerializationException) { > > this.handleSerializationException(record, exception.getCause()); > > } > > > > return handle((ProducerRecord) record, exception); > > } > > The default implementation either invokes #handleSerializationException or > #handle depending on the type of the exception, thus users still relying on > deprecated ProductionExceptionHandler#handle > or ProductionExceptionHandler#handleSerializationException custom > implementations won't break. > > The new ProductionExceptionHandler#handle method is now invoked in case of > serialization exception: > > > public void send(final String topic, final K key, final V value, > ...) { > > try { > > keyBytes = keySerializer.serialize(topic, headers, key); > > ... > > } catch (final ClassCastException exception) { > > ... > > } catch (final Exception exception) { > > > > try { > > response = productionExceptionHandler.handle(context, > record, new RecordSerializationException(SerializationExceptionOrigin.KEY, > exception)); > > } catch (final Exception e) { > > ... > > } > > } > > } > > To wrap the origin serialization exception and determine whether it comes > from the key or the value, a new exception class is created: > > > public class RecordSerializationException extends SerializationException > { > > public enum SerializationExceptionOrigin { > > KEY, > > VALUE > > } > > > > public RecordSerializationException(SerializationExceptionOrigin > origin, final Throwable cause); > > } > > Hope it all makes sense, > Loïc >
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
What about (106) to unify both exiting callback methods of `ProductionExceptionHandler` into a single one, instead of adding two new ones? Damien's last reply about it was: I will think about unifying, I do agree it would be cleaner. There was not follow up on this question, and the KIP right now still proposes to add two new methods, which I believe we could (should?) unify to: default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { Ie, we drop the generics `` on `ProducerRecord` what allows you to also pass in a non-serialized ProducerRecord of any type for the serialization error case. Btw: wondering if we also want to pass in a flag/enum about key vs value serialization error similar to what was proposed in KIP-1036? The only "oddity" would be, that we call the handler other error cases, too, not just for serialization exceptions. But we wculd tackle this by introducing a new class `RecordSerializationException` which would include the flag and would ensure that KS hands this exception into the handler. This would keep the handler interface/method itself clean. -Matthias On 5/3/24 2:15 AM, Loic Greffier wrote: Hi Bruno, Good catch, KIP has been updated accordingly. Regards, Loïc
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Bruno, Good catch, KIP has been updated accordingly. Regards, Loïc
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi, the KIP looks great! public static final String PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG = "process.exception.handler". needs to be changed to public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler". The name of the constant has been already corrected in the code block but the actual name of the config (i.e., the content of the constant) has not been changed yet. Best, Bruno On 5/3/24 10:35 AM, Sebastien Viale wrote: Hi, So, we all agree to revert to the regular Headers interface in ErrorHandlerContext. We will update the KIP accordingly. @Sophie => Yes, this is the last remaining question, and it has been open for voting since last week. Thanks Sébastien De : Andrew Schofield Envoyé : vendredi 3 mai 2024 06:44 À : dev@kafka.apache.org Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning This might be a fraudulent message! When clicking REPLY, your answers will NOT go to the sender (andrew_schofi...@live.com). Instead, replies will be sent to dev@kafka.apache.org. Be cautious! Hi, I’ve changed my mind on this one having read through the comments. I don’t think the exception handler should be able to mess with the headers to the detriment of the code that called the handler. While I like the hygiene of having an ImmutableHeaders interface, I feel we can use the existing interface to get the effect we desire. Thanks, Andrew This email was screened for spam and malicious content but exercise caution anyway. On 3 May 2024, at 03:40, Sophie Blee-Goldman wrote: I tend to agree that we should just return a pure Headers instead of introducing a new class/interface to protect overwriting them. I think a pretty good case has been made already so I won't add onto it, just wanted to voice my support. Is that the only remaining question on this KIP? Might be ok to move to a vote now? On Wed, May 1, 2024 at 8:05 AM Lianet M. wrote: Hi all, thanks Damien for the KIP! After looking into the KIP and comments, my only concern is aligned with one of Matthias comments, around the ImmutableHeaders introduction, with the motivation not being clear enough. The existing handlers already expose the headers (indirectly). Ex. ProductionExceptionHandler.handleSerializationException provides the ProducerRecord as an argument, so they are already exposed in those callbacks through record.headers(). Is there a reason to think that it would be a problem to expose the headers in the new ProcessingExceptionHandler, but that it's not a problem for the existing handler? If there is no real concern about the KS engine requiring those headers, it feels hard to mentally justify the complexity we transfer to the user by exposing a new concept into the callbacks to represent the headers. In the end, it strays aways from the simple/consistent representation of Headers used all over. Even if eventually the KS engine needs to use the headers after the callbacks with certainty that they were not altered, still feels like it's something we could attempt to solve internally, without having to transfer "new concepts" into the user (ex. the deep-copy as it was suggested, seems like the kind of trade-off that would maybe be acceptable here to gain simplicity and consistency among the handlers with a single existing representation of Headers). Best! Lianet On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax wrote: Thanks for the update. I am wondering if we should use `ReadOnlyHeaders` instead of `ImmutableHeaders` as interface name? Also, the returned `Header` interface is technically not immutable either, because `Header#key()` returns a mutable byte-array... Would we need a `ReadOnlyHeader` interface? If yes, it seems that `ReadOnlyHeaders` should not be a super-interface of `Headers` but it would rather be a standalone interface, and a wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some immutable type instead of `byte[]` for the value()? An alternative would be to deep-copy the value byte-array what would not be free, but given that we are talking about exception handling, it would not be on the hot code path, and thus might be acceptable? The above seems to increase the complexity significantly though. Hence, I have seconds thoughts on the immutability question: Do we really need to worry about mutability after all, because in the end, KS runtime won't read the Headers instance after the handler was called, and if a user modifies the passed in headers, there won't be any actual damage (ie, no side effects)? For this case, it might even be ok to also not add `ImmutableHeaders` to begin with? Sorry for the forth and back (yes, forth and back, because back and forth does not make
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi, I’ve changed my mind on this one having read through the comments. I don’t think the exception handler should be able to mess with the headers to the detriment of the code that called the handler. While I like the hygiene of having an ImmutableHeaders interface, I feel we can use the existing interface to get the effect we desire. Thanks, Andrew > On 3 May 2024, at 03:40, Sophie Blee-Goldman wrote: > > I tend to agree that we should just return a pure Headers instead of > introducing a new class/interface to protect overwriting them. I think a > pretty good case has been made already so I won't add onto it, just wanted > to voice my support. > > Is that the only remaining question on this KIP? Might be ok to move to a > vote now? > > On Wed, May 1, 2024 at 8:05 AM Lianet M. wrote: > >> Hi all, thanks Damien for the KIP! >> >> After looking into the KIP and comments, my only concern is aligned with >> one of Matthias comments, around the ImmutableHeaders introduction, with >> the motivation not being clear enough. The existing handlers already expose >> the headers (indirectly). Ex. >> ProductionExceptionHandler.handleSerializationException provides the >> ProducerRecord as an argument, so they are already exposed in those >> callbacks through record.headers(). Is there a reason to think that it >> would be a problem to expose the headers in the >> new ProcessingExceptionHandler, but that it's not a problem for the >> existing handler? >> >> If there is no real concern about the KS engine requiring those headers, it >> feels hard to mentally justify the complexity we transfer to the user by >> exposing a new concept into the callbacks to represent the headers. In the >> end, it strays aways from the simple/consistent representation of Headers >> used all over. Even if eventually the KS engine needs to use the headers >> after the callbacks with certainty that they were not altered, still feels >> like it's something we could attempt to solve internally, without having to >> transfer "new concepts" into the user (ex. the deep-copy as it was >> suggested, seems like the kind of trade-off that would maybe be acceptable >> here to gain simplicity and consistency among the handlers with a single >> existing representation of Headers). >> >> Best! >> >> Lianet >> >> >> >> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax wrote: >> >>> Thanks for the update. >>> >>> I am wondering if we should use `ReadOnlyHeaders` instead of >>> `ImmutableHeaders` as interface name? >>> >>> Also, the returned `Header` interface is technically not immutable >>> either, because `Header#key()` returns a mutable byte-array... Would we >>> need a `ReadOnlyHeader` interface? >>> >>> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface >>> of `Headers` but it would rather be a standalone interface, and a >>> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some >>> immutable type instead of `byte[]` for the value()? >>> >>> An alternative would be to deep-copy the value byte-array what would not >>> be free, but given that we are talking about exception handling, it >>> would not be on the hot code path, and thus might be acceptable? >>> >>> >>> The above seems to increase the complexity significantly though. Hence, >>> I have seconds thoughts on the immutability question: >>> >>> Do we really need to worry about mutability after all, because in the >>> end, KS runtime won't read the Headers instance after the handler was >>> called, and if a user modifies the passed in headers, there won't be any >>> actual damage (ie, no side effects)? For this case, it might even be ok >>> to also not add `ImmutableHeaders` to begin with? >>> >>> >>> >>> Sorry for the forth and back (yes, forth and back, because back and >>> forth does not make sense -- it's not logical -- just trying to fix >>> English :D) as I did bring up the immutability question in the first >>> place... >>> >>> >>> >>> -Matthias >>> >>> On 4/25/24 5:56 AM, Loic Greffier wrote: Hi Matthias, I have updated the KIP regarding points 103 and 108. 103. I have suggested a new `ImmutableHeaders` interface to deal with the immutability concern of the headers, which is basically the `Headers` interface without the write accesses. public interface ImmutableHeaders { Header lastHeader(String key); Iterable headers(String key); Header[] toArray(); } The `Headers` interface can be updated accordingly: public interface Headers extends ImmutableHeaders, Iterable { //… } Loïc >>> >>
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
I tend to agree that we should just return a pure Headers instead of introducing a new class/interface to protect overwriting them. I think a pretty good case has been made already so I won't add onto it, just wanted to voice my support. Is that the only remaining question on this KIP? Might be ok to move to a vote now? On Wed, May 1, 2024 at 8:05 AM Lianet M. wrote: > Hi all, thanks Damien for the KIP! > > After looking into the KIP and comments, my only concern is aligned with > one of Matthias comments, around the ImmutableHeaders introduction, with > the motivation not being clear enough. The existing handlers already expose > the headers (indirectly). Ex. > ProductionExceptionHandler.handleSerializationException provides the > ProducerRecord as an argument, so they are already exposed in those > callbacks through record.headers(). Is there a reason to think that it > would be a problem to expose the headers in the > new ProcessingExceptionHandler, but that it's not a problem for the > existing handler? > > If there is no real concern about the KS engine requiring those headers, it > feels hard to mentally justify the complexity we transfer to the user by > exposing a new concept into the callbacks to represent the headers. In the > end, it strays aways from the simple/consistent representation of Headers > used all over. Even if eventually the KS engine needs to use the headers > after the callbacks with certainty that they were not altered, still feels > like it's something we could attempt to solve internally, without having to > transfer "new concepts" into the user (ex. the deep-copy as it was > suggested, seems like the kind of trade-off that would maybe be acceptable > here to gain simplicity and consistency among the handlers with a single > existing representation of Headers). > > Best! > > Lianet > > > > On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax wrote: > > > Thanks for the update. > > > > I am wondering if we should use `ReadOnlyHeaders` instead of > > `ImmutableHeaders` as interface name? > > > > Also, the returned `Header` interface is technically not immutable > > either, because `Header#key()` returns a mutable byte-array... Would we > > need a `ReadOnlyHeader` interface? > > > > If yes, it seems that `ReadOnlyHeaders` should not be a super-interface > > of `Headers` but it would rather be a standalone interface, and a > > wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some > > immutable type instead of `byte[]` for the value()? > > > > An alternative would be to deep-copy the value byte-array what would not > > be free, but given that we are talking about exception handling, it > > would not be on the hot code path, and thus might be acceptable? > > > > > > The above seems to increase the complexity significantly though. Hence, > > I have seconds thoughts on the immutability question: > > > > Do we really need to worry about mutability after all, because in the > > end, KS runtime won't read the Headers instance after the handler was > > called, and if a user modifies the passed in headers, there won't be any > > actual damage (ie, no side effects)? For this case, it might even be ok > > to also not add `ImmutableHeaders` to begin with? > > > > > > > > Sorry for the forth and back (yes, forth and back, because back and > > forth does not make sense -- it's not logical -- just trying to fix > > English :D) as I did bring up the immutability question in the first > > place... > > > > > > > > -Matthias > > > > On 4/25/24 5:56 AM, Loic Greffier wrote: > > > Hi Matthias, > > > > > > I have updated the KIP regarding points 103 and 108. > > > > > > 103. > > > I have suggested a new `ImmutableHeaders` interface to deal with the > > > immutability concern of the headers, which is basically the `Headers` > > > interface without the write accesses. > > > > > > public interface ImmutableHeaders { > > > Header lastHeader(String key); > > > Iterable headers(String key); > > > Header[] toArray(); > > > } > > > > > > The `Headers` interface can be updated accordingly: > > > > > > public interface Headers extends ImmutableHeaders, Iterable { > > > //… > > > } > > > > > > Loïc > > >
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi all, thanks Damien for the KIP! After looking into the KIP and comments, my only concern is aligned with one of Matthias comments, around the ImmutableHeaders introduction, with the motivation not being clear enough. The existing handlers already expose the headers (indirectly). Ex. ProductionExceptionHandler.handleSerializationException provides the ProducerRecord as an argument, so they are already exposed in those callbacks through record.headers(). Is there a reason to think that it would be a problem to expose the headers in the new ProcessingExceptionHandler, but that it's not a problem for the existing handler? If there is no real concern about the KS engine requiring those headers, it feels hard to mentally justify the complexity we transfer to the user by exposing a new concept into the callbacks to represent the headers. In the end, it strays aways from the simple/consistent representation of Headers used all over. Even if eventually the KS engine needs to use the headers after the callbacks with certainty that they were not altered, still feels like it's something we could attempt to solve internally, without having to transfer "new concepts" into the user (ex. the deep-copy as it was suggested, seems like the kind of trade-off that would maybe be acceptable here to gain simplicity and consistency among the handlers with a single existing representation of Headers). Best! Lianet On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax wrote: > Thanks for the update. > > I am wondering if we should use `ReadOnlyHeaders` instead of > `ImmutableHeaders` as interface name? > > Also, the returned `Header` interface is technically not immutable > either, because `Header#key()` returns a mutable byte-array... Would we > need a `ReadOnlyHeader` interface? > > If yes, it seems that `ReadOnlyHeaders` should not be a super-interface > of `Headers` but it would rather be a standalone interface, and a > wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some > immutable type instead of `byte[]` for the value()? > > An alternative would be to deep-copy the value byte-array what would not > be free, but given that we are talking about exception handling, it > would not be on the hot code path, and thus might be acceptable? > > > The above seems to increase the complexity significantly though. Hence, > I have seconds thoughts on the immutability question: > > Do we really need to worry about mutability after all, because in the > end, KS runtime won't read the Headers instance after the handler was > called, and if a user modifies the passed in headers, there won't be any > actual damage (ie, no side effects)? For this case, it might even be ok > to also not add `ImmutableHeaders` to begin with? > > > > Sorry for the forth and back (yes, forth and back, because back and > forth does not make sense -- it's not logical -- just trying to fix > English :D) as I did bring up the immutability question in the first > place... > > > > -Matthias > > On 4/25/24 5:56 AM, Loic Greffier wrote: > > Hi Matthias, > > > > I have updated the KIP regarding points 103 and 108. > > > > 103. > > I have suggested a new `ImmutableHeaders` interface to deal with the > > immutability concern of the headers, which is basically the `Headers` > > interface without the write accesses. > > > > public interface ImmutableHeaders { > > Header lastHeader(String key); > > Iterable headers(String key); > > Header[] toArray(); > > } > > > > The `Headers` interface can be updated accordingly: > > > > public interface Headers extends ImmutableHeaders, Iterable { > > //… > > } > > > > Loïc >
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks for the update. I am wondering if we should use `ReadOnlyHeaders` instead of `ImmutableHeaders` as interface name? Also, the returned `Header` interface is technically not immutable either, because `Header#key()` returns a mutable byte-array... Would we need a `ReadOnlyHeader` interface? If yes, it seems that `ReadOnlyHeaders` should not be a super-interface of `Headers` but it would rather be a standalone interface, and a wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some immutable type instead of `byte[]` for the value()? An alternative would be to deep-copy the value byte-array what would not be free, but given that we are talking about exception handling, it would not be on the hot code path, and thus might be acceptable? The above seems to increase the complexity significantly though. Hence, I have seconds thoughts on the immutability question: Do we really need to worry about mutability after all, because in the end, KS runtime won't read the Headers instance after the handler was called, and if a user modifies the passed in headers, there won't be any actual damage (ie, no side effects)? For this case, it might even be ok to also not add `ImmutableHeaders` to begin with? Sorry for the forth and back (yes, forth and back, because back and forth does not make sense -- it's not logical -- just trying to fix English :D) as I did bring up the immutability question in the first place... -Matthias On 4/25/24 5:56 AM, Loic Greffier wrote: Hi Matthias, I have updated the KIP regarding points 103 and 108. 103. I have suggested a new `ImmutableHeaders` interface to deal with the immutability concern of the headers, which is basically the `Headers` interface without the write accesses. public interface ImmutableHeaders { Header lastHeader(String key); Iterable headers(String key); Header[] toArray(); } The `Headers` interface can be updated accordingly: public interface Headers extends ImmutableHeaders, Iterable { //… } Loïc
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Matthias, I have updated the KIP regarding points 103 and 108. 103. I have suggested a new `ImmutableHeaders` interface to deal with the immutability concern of the headers, which is basically the `Headers` interface without the write accesses. public interface ImmutableHeaders { Header lastHeader(String key); Iterable headers(String key); Header[] toArray(); } The `Headers` interface can be updated accordingly: public interface Headers extends ImmutableHeaders, Iterable { //… } Loïc
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
isting handlers in a separate KIP, what do > >>>>> you think? > >>>>> Maybe I am overthinking this part and the ProcessingContext would be > >>>>> fine. > >>>>> > >>>>> 4. Good point regarding the dropped-record metric, as it is used by > >>>>> the other handlers, I do think it makes sense to leverage it instead > >>>>> of creating a new metric. > >>>>> I will update the KIP to update the dropped-record-metric. > >>>>> > >>>>> 8. Regarding the DSL, I am aligned with Bruno, I think we could close > >>>>> the gaps in a future KIP. > >>>>> > >>>>> Cheers, > >>>>> Damien > >>>>> > >>>>> > >>>>> On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna wrote: > >>>>>> > >>>>>> Hi Matthias, > >>>>>> > >>>>>> > >>>>>> 1.a > >>>>>> With processor node ID, I mean the ID that is exposed in the tags of > >>>>>> processor node metrics. That ID cannot be internal since it is exposed > >>>>>> in metrics. I think the processor name and the processor node ID is the > >>>>>> same thing. I followed how the processor node ID is set in metrics and > >>>>>> I > >>>>>> ended up in addProcessor(name, ...). > >>>>>> > >>>>>> > >>>>>> 1.b > >>>>>> Regarding ProcessingContext, I also thought about a separate class to > >>>>>> pass-in context information into the handler, but then I dismissed the > >>>>>> idea because I thought I was overthinking it. Apparently, I was not > >>>>>> overthinking it if you also had the same idea. So let's consider a > >>>>>> separate class. > >>>>>> > >>>>>> > >>>>>> 4. > >>>>>> Regarding the metric, thanks for pointing to the dropped-record metric, > >>>>>> Matthias. The dropped-record metric is used with the deserialization > >>>>>> handler and the production handler. So, it would make sense to also use > >>>>>> it for this handler. However, the dropped-record metric only records > >>>>>> records that are skipped by the handler and not the number of calls to > >>>>>> the handler. But that difference is probably irrelevant since in case > >>>>>> of > >>>>>> FAIL, the metric will be reset anyways since the stream thread will be > >>>>>> restarted. In conclusion, I think the dropped-record metric in > >>>>>> combination with a warn log message might be the better choice to > >>>>>> introducing a new metric. > >>>>>> > >>>>>> > >>>>>> 8. > >>>>>> Regarding the DSL, I think we should close possible gaps in a separate > >>>>> KIP. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> On 4/11/24 12:06 AM, Matthias J. Sax wrote: > >>>>>>> Thanks for the KIP. Great discussion. > >>>>>>> > >>>>>>> I am not sure if I understand the proposal from Bruno to hand in the > >>>>>>> processor node id? Isn't this internal (could not even find it > >>>>> quickly). > >>>>>>> We do have a processor name, right? Or do I mix up something? > >>>>>>> > >>>>>>> Another question is about `ProcessingContext` -- it contains a lot of > >>>>>>> (potentially irrelevant?) metadata. We should think carefully about > >>>>> what > >>>>>>> we want to pass in and what not -- removing stuff is hard, but adding > >>>>>>> stuff is easy. It's always an option to create a new interface that > >>>>> only > >>>>>>> exposes stuff we find useful, and allows us to evolve this interface > >>>>>>> independent of others. Re-using an existing interface always has the > >>>>>>> danger to introduce an undesired coupling that could bite us in the > >>>>>>> future. --
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thu, 11 Apr 2024 at 11:56, Bruno Cadonna wrote: Hi Matthias, 1.a With processor node ID, I mean the ID that is exposed in the tags of processor node metrics. That ID cannot be internal since it is exposed in metrics. I think the processor name and the processor node ID is the same thing. I followed how the processor node ID is set in metrics and I ended up in addProcessor(name, ...). 1.b Regarding ProcessingContext, I also thought about a separate class to pass-in context information into the handler, but then I dismissed the idea because I thought I was overthinking it. Apparently, I was not overthinking it if you also had the same idea. So let's consider a separate class. 4. Regarding the metric, thanks for pointing to the dropped-record metric, Matthias. The dropped-record metric is used with the deserialization handler and the production handler. So, it would make sense to also use it for this handler. However, the dropped-record metric only records records that are skipped by the handler and not the number of calls to the handler. But that difference is probably irrelevant since in case of FAIL, the metric will be reset anyways since the stream thread will be restarted. In conclusion, I think the dropped-record metric in combination with a warn log message might be the better choice to introducing a new metric. 8. Regarding the DSL, I think we should close possible gaps in a separate KIP. Best, Bruno On 4/11/24 12:06 AM, Matthias J. Sax wrote: Thanks for the KIP. Great discussion. I am not sure if I understand the proposal from Bruno to hand in the processor node id? Isn't this internal (could not even find it quickly). We do have a processor name, right? Or do I mix up something? Another question is about `ProcessingContext` -- it contains a lot of (potentially irrelevant?) metadata. We should think carefully about what we want to pass in and what not -- removing stuff is hard, but adding stuff is easy. It's always an option to create a new interface that only exposes stuff we find useful, and allows us to evolve this interface independent of others. Re-using an existing interface always has the danger to introduce an undesired coupling that could bite us in the future. -- It make total sense to pass in `RecordMetadata`, but `ProcessingContext` (even if already limited compared to `ProcessorContext`) still seems to be too broad? For example, there is `getStateStore()` and `schedule()` methods which I think we should not expose. The other interesting question is about "what record gets passed in". For the PAPI, passing in the Processor's input record make a lot of sense. However, for DSL operators, I am not 100% sure? The DSL often uses internal types not exposed to the user, and thus I am not sure if users could write useful code for this case? -- In general, I still agree that the handler should be implement with a try-catch around `Processor.process()` but it might not be too useful for DSL processor. Hence, I am wondering if we need to so something more in the DSL? I don't have a concrete proposal (a few high level ideas only) and if we don't do anything special for the DSL I am ok with moving forward with this KIP as-is, but we should be aware of potential limitations for DSL users. We can always do a follow up KIP to close gaps when we understand the impact better -- covering the DSL would also expand the scope of this KIP significantly... About the metric: just to double check. Do we think it's worth to add a new metric? Or could we re-use the existing "dropped record metric"? -Matthias On 4/10/24 5:11 AM, Sebastien Viale wrote: Hi, You are right, it will simplify types. We update the KIP regards Sébastien *VIALE*** *MICHELIN GROUP* - InfORMATION Technology *Technical Expert Kafka* Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand *De :* Bruno Cadonna *Envoyé :* mercredi 10 avril 2024 10:38 *À :* dev@kafka.apache.org *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Damien, and Sébastien, Great that we are converging! 3. Damien and Loïc, I think in your examples the handler will receive Record because an Record is passed to the processor in the following code line: https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 < https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 I see that we do not need to pass into the the handler a Record byte[]> just because we do that for the DeserializationExceptionHa
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
;>> I am not sure if I understand the proposal from Bruno to hand in the > >>>>> processor node id? Isn't this internal (could not even find it > >>> quickly). > >>>>> We do have a processor name, right? Or do I mix up something? > >>>>> > >>>>> Another question is about `ProcessingContext` -- it contains a lot of > >>>>> (potentially irrelevant?) metadata. We should think carefully about > >>> what > >>>>> we want to pass in and what not -- removing stuff is hard, but adding > >>>>> stuff is easy. It's always an option to create a new interface that > >>> only > >>>>> exposes stuff we find useful, and allows us to evolve this interface > >>>>> independent of others. Re-using an existing interface always has the > >>>>> danger to introduce an undesired coupling that could bite us in the > >>>>> future. -- It make total sense to pass in `RecordMetadata`, but > >>>>> `ProcessingContext` (even if already limited compared to > >>>>> `ProcessorContext`) still seems to be too broad? For example, there is > >>>>> `getStateStore()` and `schedule()` methods which I think we should not > >>>>> expose. > >>>>> > >>>>> The other interesting question is about "what record gets passed in". > >>>>> For the PAPI, passing in the Processor's input record make a lot of > >>>>> sense. However, for DSL operators, I am not 100% sure? The DSL often > >>>>> uses internal types not exposed to the user, and thus I am not sure if > >>>>> users could write useful code for this case? -- In general, I still > >>>>> agree that the handler should be implement with a try-catch around > >>>>> `Processor.process()` but it might not be too useful for DSL processor. > >>>>> Hence, I am wondering if we need to so something more in the DSL? I > >>>>> don't have a concrete proposal (a few high level ideas only) and if we > >>>>> don't do anything special for the DSL I am ok with moving forward with > >>>>> this KIP as-is, but we should be aware of potential limitations for DSL > >>>>> users. We can always do a follow up KIP to close gaps when we > >>> understand > >>>>> the impact better -- covering the DSL would also expand the scope of > >>>>> this KIP significantly... > >>>>> > >>>>> About the metric: just to double check. Do we think it's worth to add a > >>>>> new metric? Or could we re-use the existing "dropped record metric"? > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 4/10/24 5:11 AM, Sebastien Viale wrote: > >>>>>> Hi, > >>>>>> > >>>>>> You are right, it will simplify types. > >>>>>> > >>>>>> We update the KIP > >>>>>> > >>>>>> regards > >>>>>> > >>>>>> Sébastien *VIALE*** > >>>>>> > >>>>>> *MICHELIN GROUP* - InfORMATION Technology > >>>>>> *Technical Expert Kafka* > >>>>>> > >>>>>>Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand > >>>>>> > >>>>>> > >>> > >>>>>> *De :* Bruno Cadonna > >>>>>> *Envoyé :* mercredi 10 avril 2024 10:38 > >>>>>> *À :* dev@kafka.apache.org > >>>>>> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > >>>>>> handler for exceptions occuring during processing > >>>>>> Warning External sender Do not click on any links or open any > >>>>>> attachments unless you trust the sender and know the content is safe. > >>>>>> > >>>>>> Hi Loïc, Damien, and Sébastien, > >>>>>> > >>>>>> Great that we are converging! > >>>>>> > >>>>>> > >>>>>> 3. > >>>>>> Damien and Loïc, I think in your examples the handler will receive > >>>>>> Record because an Record is passe
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
name and the processor node ID is the same thing. I followed how the processor node ID is set in metrics and I ended up in addProcessor(name, ...). 1.b Regarding ProcessingContext, I also thought about a separate class to pass-in context information into the handler, but then I dismissed the idea because I thought I was overthinking it. Apparently, I was not overthinking it if you also had the same idea. So let's consider a separate class. 4. Regarding the metric, thanks for pointing to the dropped-record metric, Matthias. The dropped-record metric is used with the deserialization handler and the production handler. So, it would make sense to also use it for this handler. However, the dropped-record metric only records records that are skipped by the handler and not the number of calls to the handler. But that difference is probably irrelevant since in case of FAIL, the metric will be reset anyways since the stream thread will be restarted. In conclusion, I think the dropped-record metric in combination with a warn log message might be the better choice to introducing a new metric. 8. Regarding the DSL, I think we should close possible gaps in a separate KIP. Best, Bruno On 4/11/24 12:06 AM, Matthias J. Sax wrote: Thanks for the KIP. Great discussion. I am not sure if I understand the proposal from Bruno to hand in the processor node id? Isn't this internal (could not even find it quickly). We do have a processor name, right? Or do I mix up something? Another question is about `ProcessingContext` -- it contains a lot of (potentially irrelevant?) metadata. We should think carefully about what we want to pass in and what not -- removing stuff is hard, but adding stuff is easy. It's always an option to create a new interface that only exposes stuff we find useful, and allows us to evolve this interface independent of others. Re-using an existing interface always has the danger to introduce an undesired coupling that could bite us in the future. -- It make total sense to pass in `RecordMetadata`, but `ProcessingContext` (even if already limited compared to `ProcessorContext`) still seems to be too broad? For example, there is `getStateStore()` and `schedule()` methods which I think we should not expose. The other interesting question is about "what record gets passed in". For the PAPI, passing in the Processor's input record make a lot of sense. However, for DSL operators, I am not 100% sure? The DSL often uses internal types not exposed to the user, and thus I am not sure if users could write useful code for this case? -- In general, I still agree that the handler should be implement with a try-catch around `Processor.process()` but it might not be too useful for DSL processor. Hence, I am wondering if we need to so something more in the DSL? I don't have a concrete proposal (a few high level ideas only) and if we don't do anything special for the DSL I am ok with moving forward with this KIP as-is, but we should be aware of potential limitations for DSL users. We can always do a follow up KIP to close gaps when we understand the impact better -- covering the DSL would also expand the scope of this KIP significantly... About the metric: just to double check. Do we think it's worth to add a new metric? Or could we re-use the existing "dropped record metric"? -Matthias On 4/10/24 5:11 AM, Sebastien Viale wrote: Hi, You are right, it will simplify types. We update the KIP regards Sébastien *VIALE*** *MICHELIN GROUP* - InfORMATION Technology *Technical Expert Kafka* Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand *De :* Bruno Cadonna *Envoyé :* mercredi 10 avril 2024 10:38 *À :* dev@kafka.apache.org *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Damien, and Sébastien, Great that we are converging! 3. Damien and Loïc, I think in your examples the handler will receive Record because an Record is passed to the processor in the following code line: https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 < https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 I see that we do not need to pass into the the handler a Record byte[]> just because we do that for the DeserializationExceptionHandler and the ProductionExceptionHandler. When those two handlers are called, the record is already serialized. This is not the case for the ProcessingExceptionHandler. However, I would propose to use Record ?> for the record that is passed to t
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
t; > > 4. > > > Regarding the metric, thanks for pointing to the dropped-record metric, > > > Matthias. The dropped-record metric is used with the deserialization > > > handler and the production handler. So, it would make sense to also use > > > it for this handler. However, the dropped-record metric only records > > > records that are skipped by the handler and not the number of calls to > > > the handler. But that difference is probably irrelevant since in case of > > > FAIL, the metric will be reset anyways since the stream thread will be > > > restarted. In conclusion, I think the dropped-record metric in > > > combination with a warn log message might be the better choice to > > > introducing a new metric. > > > > > > > > > 8. > > > Regarding the DSL, I think we should close possible gaps in a separate > > KIP. > > > > > > > > > Best, > > > Bruno > > > > > > On 4/11/24 12:06 AM, Matthias J. Sax wrote: > > > > Thanks for the KIP. Great discussion. > > > > > > > > I am not sure if I understand the proposal from Bruno to hand in the > > > > processor node id? Isn't this internal (could not even find it > > quickly). > > > > We do have a processor name, right? Or do I mix up something? > > > > > > > > Another question is about `ProcessingContext` -- it contains a lot of > > > > (potentially irrelevant?) metadata. We should think carefully about > > what > > > > we want to pass in and what not -- removing stuff is hard, but adding > > > > stuff is easy. It's always an option to create a new interface that > > only > > > > exposes stuff we find useful, and allows us to evolve this interface > > > > independent of others. Re-using an existing interface always has the > > > > danger to introduce an undesired coupling that could bite us in the > > > > future. -- It make total sense to pass in `RecordMetadata`, but > > > > `ProcessingContext` (even if already limited compared to > > > > `ProcessorContext`) still seems to be too broad? For example, there is > > > > `getStateStore()` and `schedule()` methods which I think we should not > > > > expose. > > > > > > > > The other interesting question is about "what record gets passed in". > > > > For the PAPI, passing in the Processor's input record make a lot of > > > > sense. However, for DSL operators, I am not 100% sure? The DSL often > > > > uses internal types not exposed to the user, and thus I am not sure if > > > > users could write useful code for this case? -- In general, I still > > > > agree that the handler should be implement with a try-catch around > > > > `Processor.process()` but it might not be too useful for DSL processor. > > > > Hence, I am wondering if we need to so something more in the DSL? I > > > > don't have a concrete proposal (a few high level ideas only) and if we > > > > don't do anything special for the DSL I am ok with moving forward with > > > > this KIP as-is, but we should be aware of potential limitations for DSL > > > > users. We can always do a follow up KIP to close gaps when we > > understand > > > > the impact better -- covering the DSL would also expand the scope of > > > > this KIP significantly... > > > > > > > > About the metric: just to double check. Do we think it's worth to add a > > > > new metric? Or could we re-use the existing "dropped record metric"? > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 4/10/24 5:11 AM, Sebastien Viale wrote: > > > >> Hi, > > > >> > > > >> You are right, it will simplify types. > > > >> > > > >> We update the KIP > > > >> > > > >> regards > > > >> > > > >> Sébastien *VIALE*** > > > >> > > > >> *MICHELIN GROUP* - InfORMATION Technology > > > >> *Technical Expert Kafka* > > > >> > > > >> Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand > > > >> > > > >> > > > > > >> *De :* Bruno Cadonna > > > >> *Envoyé :* mercredi 10 avril 2024 10:38 > > > >> *À :* dev@k
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
a lot of > > > (potentially irrelevant?) metadata. We should think carefully about > what > > > we want to pass in and what not -- removing stuff is hard, but adding > > > stuff is easy. It's always an option to create a new interface that > only > > > exposes stuff we find useful, and allows us to evolve this interface > > > independent of others. Re-using an existing interface always has the > > > danger to introduce an undesired coupling that could bite us in the > > > future. -- It make total sense to pass in `RecordMetadata`, but > > > `ProcessingContext` (even if already limited compared to > > > `ProcessorContext`) still seems to be too broad? For example, there is > > > `getStateStore()` and `schedule()` methods which I think we should not > > > expose. > > > > > > The other interesting question is about "what record gets passed in". > > > For the PAPI, passing in the Processor's input record make a lot of > > > sense. However, for DSL operators, I am not 100% sure? The DSL often > > > uses internal types not exposed to the user, and thus I am not sure if > > > users could write useful code for this case? -- In general, I still > > > agree that the handler should be implement with a try-catch around > > > `Processor.process()` but it might not be too useful for DSL processor. > > > Hence, I am wondering if we need to so something more in the DSL? I > > > don't have a concrete proposal (a few high level ideas only) and if we > > > don't do anything special for the DSL I am ok with moving forward with > > > this KIP as-is, but we should be aware of potential limitations for DSL > > > users. We can always do a follow up KIP to close gaps when we > understand > > > the impact better -- covering the DSL would also expand the scope of > > > this KIP significantly... > > > > > > About the metric: just to double check. Do we think it's worth to add a > > > new metric? Or could we re-use the existing "dropped record metric"? > > > > > > > > > > > > -Matthias > > > > > > > > > On 4/10/24 5:11 AM, Sebastien Viale wrote: > > >> Hi, > > >> > > >> You are right, it will simplify types. > > >> > > >> We update the KIP > > >> > > >> regards > > >> > > >> Sébastien *VIALE*** > > >> > > >> *MICHELIN GROUP* - InfORMATION Technology > > >> *Technical Expert Kafka* > > >> > > >> Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand > > >> > > >> > > > >> *De :* Bruno Cadonna > > >> *Envoyé :* mercredi 10 avril 2024 10:38 > > >> *À :* dev@kafka.apache.org > > >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > > >> handler for exceptions occuring during processing > > >> Warning External sender Do not click on any links or open any > > >> attachments unless you trust the sender and know the content is safe. > > >> > > >> Hi Loïc, Damien, and Sébastien, > > >> > > >> Great that we are converging! > > >> > > >> > > >> 3. > > >> Damien and Loïc, I think in your examples the handler will receive > > >> Record because an Record is passed to > > >> the processor in the following code line: > > >> > https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > < > https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > > > > >> > > >> I see that we do not need to pass into the the handler a > Record > >> byte[]> just because we do that for the > DeserializationExceptionHandler > > >> and the ProductionExceptionHandler. When those two handlers are > called, > > >> the record is already serialized. This is not the case for the > > >> ProcessingExceptionHandler. However, I would propose to use Record ?> > > >> for the record that is passed to the ProcessingExceptionHandler > because > > >> it makes the handler API more flexible. > > >> > > >> > > >> Best, > > >> Bruno > > >&g
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
ly) and if we > > don't do anything special for the DSL I am ok with moving forward with > > this KIP as-is, but we should be aware of potential limitations for DSL > > users. We can always do a follow up KIP to close gaps when we understand > > the impact better -- covering the DSL would also expand the scope of > > this KIP significantly... > > > > About the metric: just to double check. Do we think it's worth to add a > > new metric? Or could we re-use the existing "dropped record metric"? > > > > > > > > -Matthias > > > > > > On 4/10/24 5:11 AM, Sebastien Viale wrote: > >> Hi, > >> > >> You are right, it will simplify types. > >> > >> We update the KIP > >> > >> regards > >> > >> Sébastien *VIALE*** > >> > >> *MICHELIN GROUP* - InfORMATION Technology > >> *Technical Expert Kafka* > >> > >> Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand > >> > >> > >> *De :* Bruno Cadonna > >> *Envoyé :* mercredi 10 avril 2024 10:38 > >> *À :* dev@kafka.apache.org > >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > >> handler for exceptions occuring during processing > >> Warning External sender Do not click on any links or open any > >> attachments unless you trust the sender and know the content is safe. > >> > >> Hi Loïc, Damien, and Sébastien, > >> > >> Great that we are converging! > >> > >> > >> 3. > >> Damien and Loïc, I think in your examples the handler will receive > >> Record because an Record is passed to > >> the processor in the following code line: > >> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 > >> > >> <https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152> > >> > >> I see that we do not need to pass into the the handler a Record >> byte[]> just because we do that for the DeserializationExceptionHandler > >> and the ProductionExceptionHandler. When those two handlers are called, > >> the record is already serialized. This is not the case for the > >> ProcessingExceptionHandler. However, I would propose to use Record > >> for the record that is passed to the ProcessingExceptionHandler because > >> it makes the handler API more flexible. > >> > >> > >> Best, > >> Bruno > >> > >> This email was screened for spam and malicious content but exercise > >> caution anyway. > >> > >> > >> > >> > >> On 4/9/24 9:09 PM, Loic Greffier wrote: > >> > Hi Bruno and Bill, > >> > > >> > To complete the Damien's purposes about the point 3. > >> > > >> > Processing errors are caught and handled by the > >> ProcessingErrorHandler, at the precise moment when records are > >> processed by processor nodes. The handling will be performed in the > >> "process" method of the ProcessorNode, such as: > >> > > >> > public void process(final Record record) { > >> > ... > >> > > >> > try { > >> > ... > >> > } catch (final ClassCastException e) { > >> > ... > >> > } catch (Exception e) { > >> > ProcessingExceptionHandler.ProcessingHandlerResponse response = > >> this.processingExceptionHandler > >> > .handle(internalProcessorContext, (Record) record, e); > >> > > >> > if (response == > >> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { > >> > throw new StreamsException("Processing exception handler is set to > >> fail upon" + > >> > " a processing error. If you would rather have the streaming > >> pipeline" + > >> > " continue after a processing error, please set the " + > >> > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", > >> > e); > >> > } > >> > } > >> > } > >> > As you can see, the record is transmitted to the > >> ProcessingExceptionHandler as a Record, as we are > >> dealing with the inp
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Matthias, 1.a With processor node ID, I mean the ID that is exposed in the tags of processor node metrics. That ID cannot be internal since it is exposed in metrics. I think the processor name and the processor node ID is the same thing. I followed how the processor node ID is set in metrics and I ended up in addProcessor(name, ...). 1.b Regarding ProcessingContext, I also thought about a separate class to pass-in context information into the handler, but then I dismissed the idea because I thought I was overthinking it. Apparently, I was not overthinking it if you also had the same idea. So let's consider a separate class. 4. Regarding the metric, thanks for pointing to the dropped-record metric, Matthias. The dropped-record metric is used with the deserialization handler and the production handler. So, it would make sense to also use it for this handler. However, the dropped-record metric only records records that are skipped by the handler and not the number of calls to the handler. But that difference is probably irrelevant since in case of FAIL, the metric will be reset anyways since the stream thread will be restarted. In conclusion, I think the dropped-record metric in combination with a warn log message might be the better choice to introducing a new metric. 8. Regarding the DSL, I think we should close possible gaps in a separate KIP. Best, Bruno On 4/11/24 12:06 AM, Matthias J. Sax wrote: Thanks for the KIP. Great discussion. I am not sure if I understand the proposal from Bruno to hand in the processor node id? Isn't this internal (could not even find it quickly). We do have a processor name, right? Or do I mix up something? Another question is about `ProcessingContext` -- it contains a lot of (potentially irrelevant?) metadata. We should think carefully about what we want to pass in and what not -- removing stuff is hard, but adding stuff is easy. It's always an option to create a new interface that only exposes stuff we find useful, and allows us to evolve this interface independent of others. Re-using an existing interface always has the danger to introduce an undesired coupling that could bite us in the future. -- It make total sense to pass in `RecordMetadata`, but `ProcessingContext` (even if already limited compared to `ProcessorContext`) still seems to be too broad? For example, there is `getStateStore()` and `schedule()` methods which I think we should not expose. The other interesting question is about "what record gets passed in". For the PAPI, passing in the Processor's input record make a lot of sense. However, for DSL operators, I am not 100% sure? The DSL often uses internal types not exposed to the user, and thus I am not sure if users could write useful code for this case? -- In general, I still agree that the handler should be implement with a try-catch around `Processor.process()` but it might not be too useful for DSL processor. Hence, I am wondering if we need to so something more in the DSL? I don't have a concrete proposal (a few high level ideas only) and if we don't do anything special for the DSL I am ok with moving forward with this KIP as-is, but we should be aware of potential limitations for DSL users. We can always do a follow up KIP to close gaps when we understand the impact better -- covering the DSL would also expand the scope of this KIP significantly... About the metric: just to double check. Do we think it's worth to add a new metric? Or could we re-use the existing "dropped record metric"? -Matthias On 4/10/24 5:11 AM, Sebastien Viale wrote: Hi, You are right, it will simplify types. We update the KIP regards Sébastien *VIALE*** *MICHELIN GROUP* - InfORMATION Technology *Technical Expert Kafka* Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand *De :* Bruno Cadonna *Envoyé :* mercredi 10 avril 2024 10:38 *À :* dev@kafka.apache.org *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Damien, and Sébastien, Great that we are converging! 3. Damien and Loïc, I think in your examples the handler will receive Record because an Record is passed to the processor in the following code line: https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 <https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152> I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler and the ProductionEx
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks for the KIP. Great discussion. I am not sure if I understand the proposal from Bruno to hand in the processor node id? Isn't this internal (could not even find it quickly). We do have a processor name, right? Or do I mix up something? Another question is about `ProcessingContext` -- it contains a lot of (potentially irrelevant?) metadata. We should think carefully about what we want to pass in and what not -- removing stuff is hard, but adding stuff is easy. It's always an option to create a new interface that only exposes stuff we find useful, and allows us to evolve this interface independent of others. Re-using an existing interface always has the danger to introduce an undesired coupling that could bite us in the future. -- It make total sense to pass in `RecordMetadata`, but `ProcessingContext` (even if already limited compared to `ProcessorContext`) still seems to be too broad? For example, there is `getStateStore()` and `schedule()` methods which I think we should not expose. The other interesting question is about "what record gets passed in". For the PAPI, passing in the Processor's input record make a lot of sense. However, for DSL operators, I am not 100% sure? The DSL often uses internal types not exposed to the user, and thus I am not sure if users could write useful code for this case? -- In general, I still agree that the handler should be implement with a try-catch around `Processor.process()` but it might not be too useful for DSL processor. Hence, I am wondering if we need to so something more in the DSL? I don't have a concrete proposal (a few high level ideas only) and if we don't do anything special for the DSL I am ok with moving forward with this KIP as-is, but we should be aware of potential limitations for DSL users. We can always do a follow up KIP to close gaps when we understand the impact better -- covering the DSL would also expand the scope of this KIP significantly... About the metric: just to double check. Do we think it's worth to add a new metric? Or could we re-use the existing "dropped record metric"? -Matthias On 4/10/24 5:11 AM, Sebastien Viale wrote: Hi, You are right, it will simplify types. We update the KIP regards Sébastien *VIALE*** *MICHELIN GROUP* - InfORMATION Technology *Technical Expert Kafka* Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand *De :* Bruno Cadonna *Envoyé :* mercredi 10 avril 2024 10:38 *À :* dev@kafka.apache.org *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Damien, and Sébastien, Great that we are converging! 3. Damien and Loïc, I think in your examples the handler will receive Record because an Record is passed to the processor in the following code line: https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 <https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152> I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler and the ProductionExceptionHandler. When those two handlers are called, the record is already serialized. This is not the case for the ProcessingExceptionHandler. However, I would propose to use Record for the record that is passed to the ProcessingExceptionHandler because it makes the handler API more flexible. Best, Bruno This email was screened for spam and malicious content but exercise caution anyway. On 4/9/24 9:09 PM, Loic Greffier wrote: > Hi Bruno and Bill, > > To complete the Damien's purposes about the point 3. > > Processing errors are caught and handled by the ProcessingErrorHandler, at the precise moment when records are processed by processor nodes. The handling will be performed in the "process" method of the ProcessorNode, such as: > > public void process(final Record record) { > ... > > try { > ... > } catch (final ClassCastException e) { > ... > } catch (Exception e) { > ProcessingExceptionHandler.ProcessingHandlerResponse response = this.processingExceptionHandler > .handle(internalProcessorContext, (Record) record, e); > > if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { > throw new StreamsException("Processing exception handler is set to fail upon" + > " a processing error. If you would rather have the streaming pipeline" + > " continue after a processing error, please s
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Loïc, Damien, and Sébastien, Great that we are converging! 3. Damien and Loïc, I think in your examples the handler will receive Record because an Record is passed to the processor in the following code line: https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152 I see that we do not need to pass into the the handler a Recordbyte[]> just because we do that for the DeserializationExceptionHandler and the ProductionExceptionHandler. When those two handlers are called, the record is already serialized. This is not the case for the ProcessingExceptionHandler. However, I would propose to use Record for the record that is passed to the ProcessingExceptionHandler because it makes the handler API more flexible. Best, Bruno On 4/9/24 9:09 PM, Loic Greffier wrote: Hi Bruno and Bill, To complete the Damien's purposes about the point 3. Processing errors are caught and handled by the ProcessingErrorHandler, at the precise moment when records are processed by processor nodes. The handling will be performed in the "process" method of the ProcessorNode, such as: public void process(final Record record) { ... try { ... } catch (final ClassCastException e) { ... } catch (Exception e) { ProcessingExceptionHandler.ProcessingHandlerResponse response = this.processingExceptionHandler .handle(internalProcessorContext, (Record) record, e); if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { throw new StreamsException("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + " continue after a processing error, please set the " + DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", e); } } } As you can see, the record is transmitted to the ProcessingExceptionHandler as a Record, as we are dealing with the input record of the processor at this point. It can be any type, including non-serializable types, as suggested by the Damien's example. As the ProcessingErrorHandler is not intended to perform any serialization, there should be no issue for the users to handle a Record. I follow Damien on the other points. For point 6, underlying public interfaces are renamed as well: - The ProcessingHandlerResponse - The ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler - The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG (default.processing.exception.handler) Regards, Loïc De : Damien Gasparina Envoyé : mardi 9 avril 2024 20:08 À : dev@kafka.apache.org Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Bruno, Bill, First of all, thanks a lot for all your useful comments. 1. and 2. I am wondering whether we should expose the processor node ID -- which basically is the processor node name -- in the ProcessingContext interface. I think the processor node ID fits well in the ProcessingContext interface since it already contains application ID and task ID and it would make the API for the handler cleaner. That's a good point, the actual ProcessorContextImpl is already holding the current node in an attribute (currentNode), thus exposing the node ID should not be a problem. Let me sleep on it and get back to you regarding this point. 3. Could you elaborate -- maybe with an example -- when a record is in a state in which it cannot be serialized? This is not completely clear to me. The Record passed to the handler is the input record to the processor. In the Kafka Streams API, it could be any POJO. e.g. with the following topology ` streamsBuilder.stream("x") .map((k, v) -> new KeyValue("foo", Pair.of("hello", "world"))) .forEach((k, v) -> throw new RuntimeException()) I would expect the handler to receive a Record>. 4. Regarding the metrics, it is not entirely clear to me what the metric measures. Is it the number of calls to the process handler or is it the number of calls to process handler that returned FAIL? If it is the former, I was also wondering whether it would be better to put the task-level metrics to INFO reporting level and remove the thread-level metric, similar to the dropped-records metric. You can always roll-up the metrics to the thread level in your preferred monitoring system. Or do you think we end up with to many metrics? We were thinking of the for
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Bruno and Bill, To complete the Damien's purposes about the point 3. Processing errors are caught and handled by the ProcessingErrorHandler, at the precise moment when records are processed by processor nodes. The handling will be performed in the "process" method of the ProcessorNode, such as: public void process(final Record record) { ... try { ... } catch (final ClassCastException e) { ... } catch (Exception e) { ProcessingExceptionHandler.ProcessingHandlerResponse response = this.processingExceptionHandler .handle(internalProcessorContext, (Record) record, e); if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { throw new StreamsException("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + " continue after a processing error, please set the " + DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", e); } } } As you can see, the record is transmitted to the ProcessingExceptionHandler as a Record, as we are dealing with the input record of the processor at this point. It can be any type, including non-serializable types, as suggested by the Damien's example. As the ProcessingErrorHandler is not intended to perform any serialization, there should be no issue for the users to handle a Record. I follow Damien on the other points. For point 6, underlying public interfaces are renamed as well: - The ProcessingHandlerResponse - The ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler - The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG (default.processing.exception.handler) Regards, Loïc De : Damien Gasparina Envoyé : mardi 9 avril 2024 20:08 À : dev@kafka.apache.org Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Bruno, Bill, First of all, thanks a lot for all your useful comments. > 1. and 2. > I am wondering whether we should expose the processor node ID -- which > basically is the processor node name -- in the ProcessingContext > interface. I think the processor node ID fits well in the > ProcessingContext interface since it already contains application ID and > task ID and it would make the API for the handler cleaner. That's a good point, the actual ProcessorContextImpl is already holding the current node in an attribute (currentNode), thus exposing the node ID should not be a problem. Let me sleep on it and get back to you regarding this point. > 3. > Could you elaborate -- maybe with an example -- when a record is in a > state in which it cannot be serialized? This is not completely clear to me. The Record passed to the handler is the input record to the processor. In the Kafka Streams API, it could be any POJO. e.g. with the following topology ` streamsBuilder.stream("x") .map((k, v) -> new KeyValue("foo", Pair.of("hello", "world"))) .forEach((k, v) -> throw new RuntimeException()) I would expect the handler to receive a Record>. > 4. > Regarding the metrics, it is not entirely clear to me what the metric > measures. Is it the number of calls to the process handler or is it the > number of calls to process handler that returned FAIL? > If it is the former, I was also wondering whether it would be better to > put the task-level metrics to INFO reporting level and remove the > thread-level metric, similar to the dropped-records metric. You can > always roll-up the metrics to the thread level in your preferred > monitoring system. Or do you think we end up with to many metrics? We were thinking of the former, measuring the number of calls to the process handler. That's a good point, having the information at the task level could be beneficial. I updated the KIP to change the metric level and to clarify the wording. > 5. > What do you think about naming the handler ProcessingExceptionHandler > instead of ProcessExceptionHandler? > The DeserializationExceptionHanlder and the ProductionExceptionHandler > also use the noun of the action in their name and not the verb. Good catch, I updated the KIP to rename it ProcessingExceptionHandler. > 6. > What record is exactly passed to the handler? > Is it the input record to the task? Is it the input record to the > processor node? Is it the input record to the processor? The input record of the processor. I assume that is the most user friendly record in t
Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
gt; > > basically is the processor node name -- in the ProcessingContext > > > interface. I think the processor node ID fits well in the > > > ProcessingContext interface since it already contains application ID > and > > > task ID and it would make the API for the handler cleaner. > > > > > > > > > 3. > > > Could you elaborate -- maybe with an example -- when a record is in a > > > state in which it cannot be serialized? This is not completely clear to > > me. > > > > > > > > > 4. > > > Regarding the metrics, it is not entirely clear to me what the metric > > > measures. Is it the number of calls to the process handler or is it the > > > number of calls to process handler that returned FAIL? > > > If it is the former, I was also wondering whether it would be better to > > > put the task-level metrics to INFO reporting level and remove the > > > thread-level metric, similar to the dropped-records metric. You can > > > always roll-up the metrics to the thread level in your preferred > > > monitoring system. Or do you think we end up with to many metrics? > > > > > > > > > 5. > > > What do you think about naming the handler ProcessingExceptionHandler > > > instead of ProcessExceptionHandler? > > > The DeserializationExceptionHanlder and the ProductionExceptionHandler > > > also use the noun of the action in their name and not the verb. > > > > > > > > > Best, > > > Bruno > > > > > > > > > On 4/8/24 3:48 PM, Sebastien Viale wrote: > > >> Thanks for your review! > > >> > > >> All the points make sense for us! > > >> > > >> > > >> > > >> We updated the KIP for points 1 and 4. > > >> > > >> > > >> > > >> 2/ We followed the DeserializationExceptionHandler interface > > >> signature, it was not on our mind that the record be forwarded with > > >> the ProcessorContext. > > >> > > >> The ProcessingContext is sufficient, we do expect that most people > > >> would need to access the RecordMetadata. > > >> > > >> > > >> > > >> 3/ The use of Record is required, as the error could > > >> occurred in the middle of a processor where records could be non > > >> serializable objects > > >> > > >> As it is a global error catching, the user may need little > > >> information about the faulty record. > > >> > > >> Assuming that users want to make some specific treatments to the > > >> record, they can add a try / catch block in the topology. > > >> > > >> It is up to users to cast record value and key in the implementation > > >> of the ProcessorExceptionHandler. > > >> > > >> > > >> > > >> Cheers > > >> > > >> Loïc, Damien and Sébastien > > >> > > >> > > >> De : Sophie Blee-Goldman > > >> Envoyé : samedi 6 avril 2024 01:08 > > >> À : dev@kafka.apache.org > > >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > > >> handler for exceptions occuring during processing > > >> > > >> Warning External sender Do not click on any links or open any > > >> attachments unless you trust the sender and know the content is safe. > > >> > > >> Hi Damien, > > >> > > >> First off thanks for the KIP, this is definitely a much needed > > >> feature. On > > >> the > > >> whole it seems pretty straightforward and I am in favor of the > proposal. > > >> Just > > >> a few questions and suggestions here and there: > > >> > > >> 1. One of the #handle method's parameters is "ProcessorNode node", but > > >> ProcessorNode is an internal class (and would expose a lot of > internals > > >> that we probably don't want to pass in to an exception handler). Would > > it > > >> be sufficient to just make this a String and pass in the processor > name? > > >> > > >> 2. Another of the parameters in the ProcessorContext. This would > enable > > >> the handler to potentially forward records, which imo should not be > done > > >> from the handler since it could only ever call #forward but not direct
Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Damien, Sebastien and Loic, Thanks for the KIP, this is a much-needed addition. I like the approach of getting the plumbing in for handling processor errors, allowing users to implement more complex solutions as needed. Overall how where the KIP Is now LGTM, modulo outstanding comments. I think adding the example you included in this thread to the KIP is a great idea. Regarding the metrics, I'm thinking along the same lines as Bruno. I'm wondering if we can make do with a task-level metric at the INFO level and the processor metric at DEBUG. IMHO, when it comes to tracking exceptions in processing, these two areas are where users will want to focus, higher level metrics wouldn't be as useful in this case. Thanks, Bill On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna wrote: > Hi again, > > I have additional questions/comments. > > 6. > What record is exactly passed to the handler? > Is it the input record to the task? Is it the input record to the > processor node? Is it the input record to the processor? > > > 7. > Could you please add the packages of the Java classes/interfaces/enums > you want to add? > > > Best, > Bruno > > > On 4/9/24 10:17 AM, Bruno Cadonna wrote: > > Hi Loïc, Damien, and Sébastien, > > > > Thanks for the KIP! > > I find it really great that you contribute back to Kafka Streams > > concepts you developed for kstreamplify so that everybody can take > > advantage from your improvements. > > > > I have a couple of questions/comments: > > > > 1. and 2. > > I am wondering whether we should expose the processor node ID -- which > > basically is the processor node name -- in the ProcessingContext > > interface. I think the processor node ID fits well in the > > ProcessingContext interface since it already contains application ID and > > task ID and it would make the API for the handler cleaner. > > > > > > 3. > > Could you elaborate -- maybe with an example -- when a record is in a > > state in which it cannot be serialized? This is not completely clear to > me. > > > > > > 4. > > Regarding the metrics, it is not entirely clear to me what the metric > > measures. Is it the number of calls to the process handler or is it the > > number of calls to process handler that returned FAIL? > > If it is the former, I was also wondering whether it would be better to > > put the task-level metrics to INFO reporting level and remove the > > thread-level metric, similar to the dropped-records metric. You can > > always roll-up the metrics to the thread level in your preferred > > monitoring system. Or do you think we end up with to many metrics? > > > > > > 5. > > What do you think about naming the handler ProcessingExceptionHandler > > instead of ProcessExceptionHandler? > > The DeserializationExceptionHanlder and the ProductionExceptionHandler > > also use the noun of the action in their name and not the verb. > > > > > > Best, > > Bruno > > > > > > On 4/8/24 3:48 PM, Sebastien Viale wrote: > >> Thanks for your review! > >> > >> All the points make sense for us! > >> > >> > >> > >> We updated the KIP for points 1 and 4. > >> > >> > >> > >> 2/ We followed the DeserializationExceptionHandler interface > >> signature, it was not on our mind that the record be forwarded with > >> the ProcessorContext. > >> > >> The ProcessingContext is sufficient, we do expect that most people > >> would need to access the RecordMetadata. > >> > >> > >> > >> 3/ The use of Record is required, as the error could > >> occurred in the middle of a processor where records could be non > >> serializable objects > >> > >> As it is a global error catching, the user may need little > >> information about the faulty record. > >> > >> Assuming that users want to make some specific treatments to the > >> record, they can add a try / catch block in the topology. > >> > >> It is up to users to cast record value and key in the implementation > >> of the ProcessorExceptionHandler. > >> > >> > >> > >> Cheers > >> > >> Loïc, Damien and Sébastien > >> > >> > >> De : Sophie Blee-Goldman > >> Envoyé : samedi 6 avril 2024 01:08 > >> À : dev@kafka.apache.org > >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception > >> handler for exceptions occuring during
Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi again, I have additional questions/comments. 6. What record is exactly passed to the handler? Is it the input record to the task? Is it the input record to the processor node? Is it the input record to the processor? 7. Could you please add the packages of the Java classes/interfaces/enums you want to add? Best, Bruno On 4/9/24 10:17 AM, Bruno Cadonna wrote: Hi Loïc, Damien, and Sébastien, Thanks for the KIP! I find it really great that you contribute back to Kafka Streams concepts you developed for kstreamplify so that everybody can take advantage from your improvements. I have a couple of questions/comments: 1. and 2. I am wondering whether we should expose the processor node ID -- which basically is the processor node name -- in the ProcessingContext interface. I think the processor node ID fits well in the ProcessingContext interface since it already contains application ID and task ID and it would make the API for the handler cleaner. 3. Could you elaborate -- maybe with an example -- when a record is in a state in which it cannot be serialized? This is not completely clear to me. 4. Regarding the metrics, it is not entirely clear to me what the metric measures. Is it the number of calls to the process handler or is it the number of calls to process handler that returned FAIL? If it is the former, I was also wondering whether it would be better to put the task-level metrics to INFO reporting level and remove the thread-level metric, similar to the dropped-records metric. You can always roll-up the metrics to the thread level in your preferred monitoring system. Or do you think we end up with to many metrics? 5. What do you think about naming the handler ProcessingExceptionHandler instead of ProcessExceptionHandler? The DeserializationExceptionHanlder and the ProductionExceptionHandler also use the noun of the action in their name and not the verb. Best, Bruno On 4/8/24 3:48 PM, Sebastien Viale wrote: Thanks for your review! All the points make sense for us! We updated the KIP for points 1 and 4. 2/ We followed the DeserializationExceptionHandler interface signature, it was not on our mind that the record be forwarded with the ProcessorContext. The ProcessingContext is sufficient, we do expect that most people would need to access the RecordMetadata. 3/ The use of Record is required, as the error could occurred in the middle of a processor where records could be non serializable objects As it is a global error catching, the user may need little information about the faulty record. Assuming that users want to make some specific treatments to the record, they can add a try / catch block in the topology. It is up to users to cast record value and key in the implementation of the ProcessorExceptionHandler. Cheers Loïc, Damien and Sébastien De : Sophie Blee-Goldman Envoyé : samedi 6 avril 2024 01:08 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Damien, First off thanks for the KIP, this is definitely a much needed feature. On the whole it seems pretty straightforward and I am in favor of the proposal. Just a few questions and suggestions here and there: 1. One of the #handle method's parameters is "ProcessorNode node", but ProcessorNode is an internal class (and would expose a lot of internals that we probably don't want to pass in to an exception handler). Would it be sufficient to just make this a String and pass in the processor name? 2. Another of the parameters in the ProcessorContext. This would enable the handler to potentially forward records, which imo should not be done from the handler since it could only ever call #forward but not direct where the record is actually forwarded to, and could cause confusion if users aren't aware that the handler is effectively calling from the context of the processor that threw the exception. 2a. If you don't explicitly want the ability to forward records, I would suggest changing the type of this parameter to ProcessingContext, which has all the metadata and useful info of the ProcessorContext but without the forwarding APIs. This would also lets us sidestep the following issue: 2b. If you *do* want the ability to forward records, setting aside whether that in of itself makes sense to do, we would need to pass in either a regular ProcessorContext or a FixedKeyProcessorContext, depending on what kind of processor it is. I'm not quite sure how we could design a clean API here, so I'll hold off until you clarify whether you even want forwarding or not. We would also need to split the input record into a Record vs FixedKeyRecord 3. One notable difference between th
Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Loïc, Damien, and Sébastien, Thanks for the KIP! I find it really great that you contribute back to Kafka Streams concepts you developed for kstreamplify so that everybody can take advantage from your improvements. I have a couple of questions/comments: 1. and 2. I am wondering whether we should expose the processor node ID -- which basically is the processor node name -- in the ProcessingContext interface. I think the processor node ID fits well in the ProcessingContext interface since it already contains application ID and task ID and it would make the API for the handler cleaner. 3. Could you elaborate -- maybe with an example -- when a record is in a state in which it cannot be serialized? This is not completely clear to me. 4. Regarding the metrics, it is not entirely clear to me what the metric measures. Is it the number of calls to the process handler or is it the number of calls to process handler that returned FAIL? If it is the former, I was also wondering whether it would be better to put the task-level metrics to INFO reporting level and remove the thread-level metric, similar to the dropped-records metric. You can always roll-up the metrics to the thread level in your preferred monitoring system. Or do you think we end up with to many metrics? 5. What do you think about naming the handler ProcessingExceptionHandler instead of ProcessExceptionHandler? The DeserializationExceptionHanlder and the ProductionExceptionHandler also use the noun of the action in their name and not the verb. Best, Bruno On 4/8/24 3:48 PM, Sebastien Viale wrote: Thanks for your review! All the points make sense for us! We updated the KIP for points 1 and 4. 2/ We followed the DeserializationExceptionHandler interface signature, it was not on our mind that the record be forwarded with the ProcessorContext. The ProcessingContext is sufficient, we do expect that most people would need to access the RecordMetadata. 3/ The use of Record is required, as the error could occurred in the middle of a processor where records could be non serializable objects As it is a global error catching, the user may need little information about the faulty record. Assuming that users want to make some specific treatments to the record, they can add a try / catch block in the topology. It is up to users to cast record value and key in the implementation of the ProcessorExceptionHandler. Cheers Loïc, Damien and Sébastien De : Sophie Blee-Goldman Envoyé : samedi 6 avril 2024 01:08 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Damien, First off thanks for the KIP, this is definitely a much needed feature. On the whole it seems pretty straightforward and I am in favor of the proposal. Just a few questions and suggestions here and there: 1. One of the #handle method's parameters is "ProcessorNode node", but ProcessorNode is an internal class (and would expose a lot of internals that we probably don't want to pass in to an exception handler). Would it be sufficient to just make this a String and pass in the processor name? 2. Another of the parameters in the ProcessorContext. This would enable the handler to potentially forward records, which imo should not be done from the handler since it could only ever call #forward but not direct where the record is actually forwarded to, and could cause confusion if users aren't aware that the handler is effectively calling from the context of the processor that threw the exception. 2a. If you don't explicitly want the ability to forward records, I would suggest changing the type of this parameter to ProcessingContext, which has all the metadata and useful info of the ProcessorContext but without the forwarding APIs. This would also lets us sidestep the following issue: 2b. If you *do* want the ability to forward records, setting aside whether that in of itself makes sense to do, we would need to pass in either a regular ProcessorContext or a FixedKeyProcessorContext, depending on what kind of processor it is. I'm not quite sure how we could design a clean API here, so I'll hold off until you clarify whether you even want forwarding or not. We would also need to split the input record into a Record vs FixedKeyRecord 3. One notable difference between this handler and the existing ones you pointed out, the Deserialization/ProductionExceptionHandler, is that the records passed in to those are in serialized bytes, whereas the record here would be POJOs. You account for this by making the parameter type a Record, but I just wonder how users would be able to read the key/value and figure out what type it should be. For example, would they need to maintain
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi, To complete the Sébastien's answer about the point 3, here is an example of how users could simply type the record key or value, based on a custom process exception handler: Properties streamProps = new Properties(); streamProps.put(StreamsConfig.DEFAULT_PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProcessExceptionHandler.class); public class CustomProcessExceptionHandler implements ProcessExceptionHandler { @Override public ProcessHandlerResponse handle(ProcessingContext context, String nodeName, Record record, Exception exception) { log.info("Error in node: {}, key: {}, value: {}, exception: {}", nodeName, record.key(), record.value(), exception); if (record.value() instanceof Animal) { Animal value = (Animal) record.value(); // Do something } return ProcessHandlerResponse.CONTINUE; } @Override public void configure(Map configs) { } } The example will be added to the KIP. Regards Sébastien, Damien and Loïc De : Sebastien Viale Envoyé : lundi 8 avril 2024 15:49 À : dev@kafka.apache.org Objet : RE: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Thanks for your review! All the points make sense for us! We updated the KIP for points 1 and 4. 2/ We followed the DeserializationExceptionHandler interface signature, it was not on our mind that the record be forwarded with the ProcessorContext. The ProcessingContext is sufficient, we do expect that most people would need to access the RecordMetadata. 3/ The use of Record is required, as the error could occurred in the middle of a processor where records could be non serializable objects As it is a global error catching, the user may need little information about the faulty record. Assuming that users want to make some specific treatments to the record, they can add a try / catch block in the topology. It is up to users to cast record value and key in the implementation of the ProcessorExceptionHandler. Cheers Loïc, Damien and Sébastien De : Sophie Blee-Goldman mailto:sop...@responsive.dev>> Envoyé : samedi 6 avril 2024 01:08 À : dev@kafka.apache.org<mailto:dev@kafka.apache.org> mailto:dev@kafka.apache.org>> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Damien, First off thanks for the KIP, this is definitely a much needed feature. On the whole it seems pretty straightforward and I am in favor of the proposal. Just a few questions and suggestions here and there: 1. One of the #handle method's parameters is "ProcessorNode node", but ProcessorNode is an internal class (and would expose a lot of internals that we probably don't want to pass in to an exception handler). Would it be sufficient to just make this a String and pass in the processor name? 2. Another of the parameters in the ProcessorContext. This would enable the handler to potentially forward records, which imo should not be done from the handler since it could only ever call #forward but not direct where the record is actually forwarded to, and could cause confusion if users aren't aware that the handler is effectively calling from the context of the processor that threw the exception. 2a. If you don't explicitly want the ability to forward records, I would suggest changing the type of this parameter to ProcessingContext, which has all the metadata and useful info of the ProcessorContext but without the forwarding APIs. This would also lets us sidestep the following issue: 2b. If you *do* want the ability to forward records, setting aside whether that in of itself makes sense to do, we would need to pass in either a regular ProcessorContext or a FixedKeyProcessorContext, depending on what kind of processor it is. I'm not quite sure how we could design a clean API here, so I'll hold off until you clarify whether you even want forwarding or not. We would also need to split the input record into a Record vs FixedKeyRecord 3. One notable difference between this handler and the existing ones you pointed out, the Deserialization/ProductionExceptionHandler, is that the records passed in to those are in serialized bytes, whereas the record here would be POJOs. You account for this by making the parameter type a Record, but I just wonder how users would be able to read the key/value and figure out what type it should be. For example, would they need to maintain a map from processor name to input record types? If you could provide an example of this new feature in the KIP, it would be very helpful in understanding whether we could do something to make it easier for users to use, for if it would be fine as-is 4. We should include
RE: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks for your review! All the points make sense for us! We updated the KIP for points 1 and 4. 2/ We followed the DeserializationExceptionHandler interface signature, it was not on our mind that the record be forwarded with the ProcessorContext. The ProcessingContext is sufficient, we do expect that most people would need to access the RecordMetadata. 3/ The use of Record is required, as the error could occurred in the middle of a processor where records could be non serializable objects As it is a global error catching, the user may need little information about the faulty record. Assuming that users want to make some specific treatments to the record, they can add a try / catch block in the topology. It is up to users to cast record value and key in the implementation of the ProcessorExceptionHandler. Cheers Loïc, Damien and Sébastien De : Sophie Blee-Goldman Envoyé : samedi 6 avril 2024 01:08 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Damien, First off thanks for the KIP, this is definitely a much needed feature. On the whole it seems pretty straightforward and I am in favor of the proposal. Just a few questions and suggestions here and there: 1. One of the #handle method's parameters is "ProcessorNode node", but ProcessorNode is an internal class (and would expose a lot of internals that we probably don't want to pass in to an exception handler). Would it be sufficient to just make this a String and pass in the processor name? 2. Another of the parameters in the ProcessorContext. This would enable the handler to potentially forward records, which imo should not be done from the handler since it could only ever call #forward but not direct where the record is actually forwarded to, and could cause confusion if users aren't aware that the handler is effectively calling from the context of the processor that threw the exception. 2a. If you don't explicitly want the ability to forward records, I would suggest changing the type of this parameter to ProcessingContext, which has all the metadata and useful info of the ProcessorContext but without the forwarding APIs. This would also lets us sidestep the following issue: 2b. If you *do* want the ability to forward records, setting aside whether that in of itself makes sense to do, we would need to pass in either a regular ProcessorContext or a FixedKeyProcessorContext, depending on what kind of processor it is. I'm not quite sure how we could design a clean API here, so I'll hold off until you clarify whether you even want forwarding or not. We would also need to split the input record into a Record vs FixedKeyRecord 3. One notable difference between this handler and the existing ones you pointed out, the Deserialization/ProductionExceptionHandler, is that the records passed in to those are in serialized bytes, whereas the record here would be POJOs. You account for this by making the parameter type a Record, but I just wonder how users would be able to read the key/value and figure out what type it should be. For example, would they need to maintain a map from processor name to input record types? If you could provide an example of this new feature in the KIP, it would be very helpful in understanding whether we could do something to make it easier for users to use, for if it would be fine as-is 4. We should include all the relevant info for a new metric, such as the metric group and recording level. You can look at other metrics KIPs like KIP-444 and KIP-613 for an example. I suspect you intend for this to be in the processor group and at the INFO level? Hope that all makes sense! Thanks again for the KIP -Sophie On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina wrote: > Hi everyone, > > After writing quite a few Kafka Streams applications, me and my colleagues > just created KIP-1033 to introduce a new Exception Handler in Kafka Streams > to simplify error handling. > This feature would allow defining an exception handler to automatically > catch exceptions occurring during the processing of a message. > > KIP link: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing> > > Feedbacks and suggestions are welcome, > > Cheers, > Damien, Sebastien and Loic > This email was screened for spam and malicious content but exercise caution anyway.
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Hi Damien, First off thanks for the KIP, this is definitely a much needed feature. On the whole it seems pretty straightforward and I am in favor of the proposal. Just a few questions and suggestions here and there: 1. One of the #handle method's parameters is "ProcessorNode node", but ProcessorNode is an internal class (and would expose a lot of internals that we probably don't want to pass in to an exception handler). Would it be sufficient to just make this a String and pass in the processor name? 2. Another of the parameters in the ProcessorContext. This would enable the handler to potentially forward records, which imo should not be done from the handler since it could only ever call #forward but not direct where the record is actually forwarded to, and could cause confusion if users aren't aware that the handler is effectively calling from the context of the processor that threw the exception. 2a. If you don't explicitly want the ability to forward records, I would suggest changing the type of this parameter to ProcessingContext, which has all the metadata and useful info of the ProcessorContext but without the forwarding APIs. This would also lets us sidestep the following issue: 2b. If you *do* want the ability to forward records, setting aside whether that in of itself makes sense to do, we would need to pass in either a regular ProcessorContext or a FixedKeyProcessorContext, depending on what kind of processor it is. I'm not quite sure how we could design a clean API here, so I'll hold off until you clarify whether you even want forwarding or not. We would also need to split the input record into a Record vs FixedKeyRecord 3. One notable difference between this handler and the existing ones you pointed out, the Deserialization/ProductionExceptionHandler, is that the records passed in to those are in serialized bytes, whereas the record here would be POJOs. You account for this by making the parameter type a Record, but I just wonder how users would be able to read the key/value and figure out what type it should be. For example, would they need to maintain a map from processor name to input record types? If you could provide an example of this new feature in the KIP, it would be very helpful in understanding whether we could do something to make it easier for users to use, for if it would be fine as-is 4. We should include all the relevant info for a new metric, such as the metric group and recording level. You can look at other metrics KIPs like KIP-444 and KIP-613 for an example. I suspect you intend for this to be in the processor group and at the INFO level? Hope that all makes sense! Thanks again for the KIP -Sophie On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina wrote: > Hi everyone, > > After writing quite a few Kafka Streams applications, me and my colleagues > just created KIP-1033 to introduce a new Exception Handler in Kafka Streams > to simplify error handling. > This feature would allow defining an exception handler to automatically > catch exceptions occurring during the processing of a message. > > KIP link: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing > > Feedbacks and suggestions are welcome, > > Cheers, > Damien, Sebastien and Loic >