Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-13 Thread Damien Gasparina
Thanks for the feedback, I think we should keep two separate callbacks
for serialization and error handlers. It makes sense for type safety
(ProducerRecord vs POJO) and also for backward
compatibility. On top of that, all metadata provided in the #handle
method would need to be held in memory until the producer invokes its
callback, in the future, having two callbacks might avoid confusion as
some metadata might be provided to #handle and not
#handleSerializationException. I do think that one method would be
cleaner but for backward compatibility, type safety and memory
reasons, we should keep two separate callbacks.

As you suggested Sophie, I updated the KIP to introduce a
SerializationExceptionOrigin enum and added the "origin" parameter to
the #handleSerializationException method.

On Sat, 11 May 2024 at 07:30, Sophie Blee-Goldman  wrote:
>
> Whoops, just noticed there is already a voting thread for this. Hard to
> keep track with all the KIPs going on right now!
>
> In that case I'll just wait for the SerializationExceptionOrigin thing to
> be added and then I'll vote. Should definitely be able to make 3.8 in this
> case :D
>
> On Fri, May 10, 2024 at 10:10 PM Sophie Blee-Goldman 
> wrote:
>
> > Sounds like we're more or less in agreement here. I think the KIP just
> > needs one small update still, which is the SerializationExceptionOrigin
> > enum.
> >
> > As discussed there are a few options for this and we're all happy to defer
> > to the preference of the KIP authors, but if we keep the KIP as-is with the
> > two separate methods in the ProductionExceptionHandler, then imo it makes
> > the most sense to add the SerializationExceptionOrigin enum to the
> > ProductionExceptionHandler interface and then add an "origin" parameter to
> > the new  #handleSerializationException method. However you decide to do it,
> > I'm personally happy to vote on this KIP once the KIP is updated.
> >
> >  Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys
> > would like to target 3.8 for this feature, just make sure to update the KIP
> > and kick off a [VOTE] thread by EOD Monday so that you can close the vote
> > by EOD Wednesday (since it has to be open for 72 hours).
> >
> > Thanks again for this sorely needed feature!
> >
> > On Fri, May 10, 2024 at 10:06 AM Bill Bejeck  wrote:
> >
> >> Great KIP discussion so far by everyone.
> >> At this point, I'm in agreement with the direction and current state of
> >> the
> >> KIP.
> >>
> >> As for having two separate callbacks for the ProductionExceptionHandler,
> >> I'm somewhat split in that I agree with points raised by Sophie and
> >> Matthias with my final
> >> position being to maintain both callbacks.  IMHO, while there are several
> >> things that could go wrong with producing a message, it seems that
> >> serialization exceptions would be the most common, although I don't have
> >> any data to back that opinion up.  But having said that, should the KIP
> >> authors decide otherwise, I would be fine with that approach as well.
> >>
> >> I'm at the point where I'm comfortable voting for this KIP.
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman  >> >
> >> wrote:
> >>
> >> > The type safety issue is definitely not solved by having two separate
> >> > callbacks. I just think it gets a bit worse by mashing them into one
> >> > method. At least in the plain #handle method you can be sure that the
> >> type
> >> > is ProducerRecord and in #handleSerialization the type
> >> is
> >> > some POJO.
> >> >
> >> > And in theory you can just embed the mapping of sink topics to
> >> type/Serde
> >> > based on your topology. Or let's say your output record keys & values
> >> are
> >> > all Strings, and you want to print the String representation in your
> >> > handler, rather than the bytes.
> >> > Having a separate callback means knowing you can simply print the
> >> > ProducerRecord's key/value in the #handleSerialization method, and will
> >> > have to use a StringDeserializer to convert the key/value to its String
> >> > form to print it in the #handle method.
> >> >
> >> > Again, I just feel this will be more straightforward and easy for users
> >> to
> >> > use correctly, but am satisfied either way. I'll shut up now and wait
> >> for
> >> > the KIP authors to make a call on this one way or another, and then I'm
> >> > happy to cast my vote
> >> >
> >> > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax 
> >> wrote:
> >> >
> >> > > Thanks Sophie! Makes it much clearer where you are coming from.
> >> > >
> >> > > About the Type unsafety: isn't this also an issue for the
> >> > > `handleSerialziationException` case, because the handler is used for
> >> all
> >> > > sink topics, and thus key/value types are not really know w/o taking
> >> the
> >> > > sink topic into account? -- So I am not sure if having two handler
> >> > > methods really helps much with regard to type safety?
> >> > >
> >> > > Just want to make this 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-10 Thread Sophie Blee-Goldman
Whoops, just noticed there is already a voting thread for this. Hard to
keep track with all the KIPs going on right now!

In that case I'll just wait for the SerializationExceptionOrigin thing to
be added and then I'll vote. Should definitely be able to make 3.8 in this
case :D

On Fri, May 10, 2024 at 10:10 PM Sophie Blee-Goldman 
wrote:

> Sounds like we're more or less in agreement here. I think the KIP just
> needs one small update still, which is the SerializationExceptionOrigin
> enum.
>
> As discussed there are a few options for this and we're all happy to defer
> to the preference of the KIP authors, but if we keep the KIP as-is with the
> two separate methods in the ProductionExceptionHandler, then imo it makes
> the most sense to add the SerializationExceptionOrigin enum to the
> ProductionExceptionHandler interface and then add an "origin" parameter to
> the new  #handleSerializationException method. However you decide to do it,
> I'm personally happy to vote on this KIP once the KIP is updated.
>
>  Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys
> would like to target 3.8 for this feature, just make sure to update the KIP
> and kick off a [VOTE] thread by EOD Monday so that you can close the vote
> by EOD Wednesday (since it has to be open for 72 hours).
>
> Thanks again for this sorely needed feature!
>
> On Fri, May 10, 2024 at 10:06 AM Bill Bejeck  wrote:
>
>> Great KIP discussion so far by everyone.
>> At this point, I'm in agreement with the direction and current state of
>> the
>> KIP.
>>
>> As for having two separate callbacks for the ProductionExceptionHandler,
>> I'm somewhat split in that I agree with points raised by Sophie and
>> Matthias with my final
>> position being to maintain both callbacks.  IMHO, while there are several
>> things that could go wrong with producing a message, it seems that
>> serialization exceptions would be the most common, although I don't have
>> any data to back that opinion up.  But having said that, should the KIP
>> authors decide otherwise, I would be fine with that approach as well.
>>
>> I'm at the point where I'm comfortable voting for this KIP.
>>
>> Thanks,
>> Bill
>>
>> On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman > >
>> wrote:
>>
>> > The type safety issue is definitely not solved by having two separate
>> > callbacks. I just think it gets a bit worse by mashing them into one
>> > method. At least in the plain #handle method you can be sure that the
>> type
>> > is ProducerRecord and in #handleSerialization the type
>> is
>> > some POJO.
>> >
>> > And in theory you can just embed the mapping of sink topics to
>> type/Serde
>> > based on your topology. Or let's say your output record keys & values
>> are
>> > all Strings, and you want to print the String representation in your
>> > handler, rather than the bytes.
>> > Having a separate callback means knowing you can simply print the
>> > ProducerRecord's key/value in the #handleSerialization method, and will
>> > have to use a StringDeserializer to convert the key/value to its String
>> > form to print it in the #handle method.
>> >
>> > Again, I just feel this will be more straightforward and easy for users
>> to
>> > use correctly, but am satisfied either way. I'll shut up now and wait
>> for
>> > the KIP authors to make a call on this one way or another, and then I'm
>> > happy to cast my vote
>> >
>> > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax 
>> wrote:
>> >
>> > > Thanks Sophie! Makes it much clearer where you are coming from.
>> > >
>> > > About the Type unsafety: isn't this also an issue for the
>> > > `handleSerialziationException` case, because the handler is used for
>> all
>> > > sink topics, and thus key/value types are not really know w/o taking
>> the
>> > > sink topic into account? -- So I am not sure if having two handler
>> > > methods really helps much with regard to type safety?
>> > >
>> > > Just want to make this small comment for completeness. Let's hear what
>> > > others think. Given that we both don't have a strong opinion but just
>> a
>> > > personal preference, we should be able to come to a conclusion quickly
>> > > and get this KIP approved for 3.8 :)
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:
>> > > > Well I definitely don't feel super strongly about it, and more
>> > > importantly,
>> > > > I'm not a user. So I will happily defer to the preference of anyone
>> who
>> > > > will actually be using this feature. But  I'll explain my reasoning:
>> > > >
>> > > > There *is* a relevant distinction between these two callbacks --
>> > because
>> > > > the passed-in record will have a different type depending on
>> whether it
>> > > was
>> > > > a serialization exception or something else. Even if we combined
>> them
>> > > into
>> > > > a single #handle method, users will still end up implementing two
>> > > distinct
>> > > > branches depending on whether it was a serialization 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-10 Thread Sophie Blee-Goldman
Sounds like we're more or less in agreement here. I think the KIP just
needs one small update still, which is the SerializationExceptionOrigin
enum.

As discussed there are a few options for this and we're all happy to defer
to the preference of the KIP authors, but if we keep the KIP as-is with the
two separate methods in the ProductionExceptionHandler, then imo it makes
the most sense to add the SerializationExceptionOrigin enum to the
ProductionExceptionHandler interface and then add an "origin" parameter to
the new  #handleSerializationException method. However you decide to do it,
I'm personally happy to vote on this KIP once the KIP is updated.

 Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys
would like to target 3.8 for this feature, just make sure to update the KIP
and kick off a [VOTE] thread by EOD Monday so that you can close the vote
by EOD Wednesday (since it has to be open for 72 hours).

Thanks again for this sorely needed feature!

On Fri, May 10, 2024 at 10:06 AM Bill Bejeck  wrote:

> Great KIP discussion so far by everyone.
> At this point, I'm in agreement with the direction and current state of the
> KIP.
>
> As for having two separate callbacks for the ProductionExceptionHandler,
> I'm somewhat split in that I agree with points raised by Sophie and
> Matthias with my final
> position being to maintain both callbacks.  IMHO, while there are several
> things that could go wrong with producing a message, it seems that
> serialization exceptions would be the most common, although I don't have
> any data to back that opinion up.  But having said that, should the KIP
> authors decide otherwise, I would be fine with that approach as well.
>
> I'm at the point where I'm comfortable voting for this KIP.
>
> Thanks,
> Bill
>
> On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman 
> wrote:
>
> > The type safety issue is definitely not solved by having two separate
> > callbacks. I just think it gets a bit worse by mashing them into one
> > method. At least in the plain #handle method you can be sure that the
> type
> > is ProducerRecord and in #handleSerialization the type is
> > some POJO.
> >
> > And in theory you can just embed the mapping of sink topics to type/Serde
> > based on your topology. Or let's say your output record keys & values are
> > all Strings, and you want to print the String representation in your
> > handler, rather than the bytes.
> > Having a separate callback means knowing you can simply print the
> > ProducerRecord's key/value in the #handleSerialization method, and will
> > have to use a StringDeserializer to convert the key/value to its String
> > form to print it in the #handle method.
> >
> > Again, I just feel this will be more straightforward and easy for users
> to
> > use correctly, but am satisfied either way. I'll shut up now and wait for
> > the KIP authors to make a call on this one way or another, and then I'm
> > happy to cast my vote
> >
> > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax 
> wrote:
> >
> > > Thanks Sophie! Makes it much clearer where you are coming from.
> > >
> > > About the Type unsafety: isn't this also an issue for the
> > > `handleSerialziationException` case, because the handler is used for
> all
> > > sink topics, and thus key/value types are not really know w/o taking
> the
> > > sink topic into account? -- So I am not sure if having two handler
> > > methods really helps much with regard to type safety?
> > >
> > > Just want to make this small comment for completeness. Let's hear what
> > > others think. Given that we both don't have a strong opinion but just a
> > > personal preference, we should be able to come to a conclusion quickly
> > > and get this KIP approved for 3.8 :)
> > >
> > >
> > > -Matthias
> > >
> > > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:
> > > > Well I definitely don't feel super strongly about it, and more
> > > importantly,
> > > > I'm not a user. So I will happily defer to the preference of anyone
> who
> > > > will actually be using this feature. But  I'll explain my reasoning:
> > > >
> > > > There *is* a relevant distinction between these two callbacks --
> > because
> > > > the passed-in record will have a different type depending on whether
> it
> > > was
> > > > a serialization exception or something else. Even if we combined them
> > > into
> > > > a single #handle method, users will still end up implementing two
> > > distinct
> > > > branches depending on whether it was a serialization exception or
> not,
> > > > since that determines the type of the ProducerRecord passed in.
> > > >
> > > > Not to mention they'll need to cast it to a ProducerRecord > > byte[]>
> > > > when we could have just passed it in as this type via a dedicated
> > > callback.
> > > > And note that because of the generics, they can't do an instanceof
> > check
> > > to
> > > > make sure that the record type is ProducerRecord and
> > will
> > > > have to suppress the "unchecked cast" warning.
> > 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-10 Thread Bill Bejeck
Great KIP discussion so far by everyone.
At this point, I'm in agreement with the direction and current state of the
KIP.

As for having two separate callbacks for the ProductionExceptionHandler,
I'm somewhat split in that I agree with points raised by Sophie and
Matthias with my final
position being to maintain both callbacks.  IMHO, while there are several
things that could go wrong with producing a message, it seems that
serialization exceptions would be the most common, although I don't have
any data to back that opinion up.  But having said that, should the KIP
authors decide otherwise, I would be fine with that approach as well.

I'm at the point where I'm comfortable voting for this KIP.

Thanks,
Bill

On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman 
wrote:

> The type safety issue is definitely not solved by having two separate
> callbacks. I just think it gets a bit worse by mashing them into one
> method. At least in the plain #handle method you can be sure that the type
> is ProducerRecord and in #handleSerialization the type is
> some POJO.
>
> And in theory you can just embed the mapping of sink topics to type/Serde
> based on your topology. Or let's say your output record keys & values are
> all Strings, and you want to print the String representation in your
> handler, rather than the bytes.
> Having a separate callback means knowing you can simply print the
> ProducerRecord's key/value in the #handleSerialization method, and will
> have to use a StringDeserializer to convert the key/value to its String
> form to print it in the #handle method.
>
> Again, I just feel this will be more straightforward and easy for users to
> use correctly, but am satisfied either way. I'll shut up now and wait for
> the KIP authors to make a call on this one way or another, and then I'm
> happy to cast my vote
>
> On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax  wrote:
>
> > Thanks Sophie! Makes it much clearer where you are coming from.
> >
> > About the Type unsafety: isn't this also an issue for the
> > `handleSerialziationException` case, because the handler is used for all
> > sink topics, and thus key/value types are not really know w/o taking the
> > sink topic into account? -- So I am not sure if having two handler
> > methods really helps much with regard to type safety?
> >
> > Just want to make this small comment for completeness. Let's hear what
> > others think. Given that we both don't have a strong opinion but just a
> > personal preference, we should be able to come to a conclusion quickly
> > and get this KIP approved for 3.8 :)
> >
> >
> > -Matthias
> >
> > On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:
> > > Well I definitely don't feel super strongly about it, and more
> > importantly,
> > > I'm not a user. So I will happily defer to the preference of anyone who
> > > will actually be using this feature. But  I'll explain my reasoning:
> > >
> > > There *is* a relevant distinction between these two callbacks --
> because
> > > the passed-in record will have a different type depending on whether it
> > was
> > > a serialization exception or something else. Even if we combined them
> > into
> > > a single #handle method, users will still end up implementing two
> > distinct
> > > branches depending on whether it was a serialization exception or not,
> > > since that determines the type of the ProducerRecord passed in.
> > >
> > > Not to mention they'll need to cast it to a ProducerRecord > byte[]>
> > > when we could have just passed it in as this type via a dedicated
> > callback.
> > > And note that because of the generics, they can't do an instanceof
> check
> > to
> > > make sure that the record type is ProducerRecord and
> will
> > > have to suppress the "unchecked cast" warning.
> > >
> > > So if we combined the two callbacks, their handler will look something
> > like
> > > this:
> > >
> > > @SuppressWarnings("unchecked")
> > > public ProductionExceptionHandlerResponse handle(final
> > ErrorHandlerContext
> > > context,
> > > final ProducerRecord record,
> > > final Exception exception) {
> > > if (exception instanceof SerializationException) {
> > > if (exception.origin().equals(KEY)) {
> > > log.error("Failed to serialize key", exception);
> > > } else {
> > > log.error("Failed to serialize value", exception);
> > > }
> > >
> > > } else {
> > > final ProducerRecord serializedRecord =
> > (ProducerRecord > > byte[]>) record;
> > > log.error("Failed to produce record with serialized key={} and
> serialized
> > > value={}",
> > > serializedRecord.key(), serializedRecord.value());
> > > }
> > > return ProductionExceptionHandlerResponse.FAIL;
> > > }
> > >
> > > That seems like the most basic case, and it still haswith distinct
> logic
> > > even if they ultimately handle exceptions the same way. And looking
> > forward
> > > to KIP-1034: Dead-letter queues, it seems all the more likely that the
> > > actual handling response might be different depending on whether it's a
> > > 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-09 Thread Sophie Blee-Goldman
The type safety issue is definitely not solved by having two separate
callbacks. I just think it gets a bit worse by mashing them into one
method. At least in the plain #handle method you can be sure that the type
is ProducerRecord and in #handleSerialization the type is
some POJO.

And in theory you can just embed the mapping of sink topics to type/Serde
based on your topology. Or let's say your output record keys & values are
all Strings, and you want to print the String representation in your
handler, rather than the bytes.
Having a separate callback means knowing you can simply print the
ProducerRecord's key/value in the #handleSerialization method, and will
have to use a StringDeserializer to convert the key/value to its String
form to print it in the #handle method.

Again, I just feel this will be more straightforward and easy for users to
use correctly, but am satisfied either way. I'll shut up now and wait for
the KIP authors to make a call on this one way or another, and then I'm
happy to cast my vote

On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax  wrote:

> Thanks Sophie! Makes it much clearer where you are coming from.
>
> About the Type unsafety: isn't this also an issue for the
> `handleSerialziationException` case, because the handler is used for all
> sink topics, and thus key/value types are not really know w/o taking the
> sink topic into account? -- So I am not sure if having two handler
> methods really helps much with regard to type safety?
>
> Just want to make this small comment for completeness. Let's hear what
> others think. Given that we both don't have a strong opinion but just a
> personal preference, we should be able to come to a conclusion quickly
> and get this KIP approved for 3.8 :)
>
>
> -Matthias
>
> On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:
> > Well I definitely don't feel super strongly about it, and more
> importantly,
> > I'm not a user. So I will happily defer to the preference of anyone who
> > will actually be using this feature. But  I'll explain my reasoning:
> >
> > There *is* a relevant distinction between these two callbacks -- because
> > the passed-in record will have a different type depending on whether it
> was
> > a serialization exception or something else. Even if we combined them
> into
> > a single #handle method, users will still end up implementing two
> distinct
> > branches depending on whether it was a serialization exception or not,
> > since that determines the type of the ProducerRecord passed in.
> >
> > Not to mention they'll need to cast it to a ProducerRecord byte[]>
> > when we could have just passed it in as this type via a dedicated
> callback.
> > And note that because of the generics, they can't do an instanceof check
> to
> > make sure that the record type is ProducerRecord and will
> > have to suppress the "unchecked cast" warning.
> >
> > So if we combined the two callbacks, their handler will look something
> like
> > this:
> >
> > @SuppressWarnings("unchecked")
> > public ProductionExceptionHandlerResponse handle(final
> ErrorHandlerContext
> > context,
> > final ProducerRecord record,
> > final Exception exception) {
> > if (exception instanceof SerializationException) {
> > if (exception.origin().equals(KEY)) {
> > log.error("Failed to serialize key", exception);
> > } else {
> > log.error("Failed to serialize value", exception);
> > }
> >
> > } else {
> > final ProducerRecord serializedRecord =
> (ProducerRecord > byte[]>) record;
> > log.error("Failed to produce record with serialized key={} and serialized
> > value={}",
> > serializedRecord.key(), serializedRecord.value());
> > }
> > return ProductionExceptionHandlerResponse.FAIL;
> > }
> >
> > That seems like the most basic case, and it still haswith distinct logic
> > even if they ultimately handle exceptions the same way. And looking
> forward
> > to KIP-1034: Dead-letter queues, it seems all the more likely that the
> > actual handling response might be different depending on whether it's a
> > serialization exception or not: a serialized record can probably be
> > retried/sent to a DLQ, whereas a record that can't be serialized should
> not
> > (can't, really) be forwarded to a DLQ. So if they're going to have
> > completely different implementations depending on whether it's a
> > serialization exception, why not just give them two separate callbacks?
> >
> > And that's all assuming the user is perfectly aware of the different
> > exception types and their implications for the type of the
> ProducerRecord.
> > Many people might just miss the existence of the
> > RecordSerializationException altogether --
> > there are already so many different exception types, ESPECIALLY when it
> > comes to the Producer. Not to mention they'll need to understand the
> > nuances of how the ProducerRecord type changes depending on the type of
> > exception that's passed in. And on top of all that, they'll need to know
> > that there is metadata stored in the 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-09 Thread Matthias J. Sax

Thanks Sophie! Makes it much clearer where you are coming from.

About the Type unsafety: isn't this also an issue for the 
`handleSerialziationException` case, because the handler is used for all 
sink topics, and thus key/value types are not really know w/o taking the 
sink topic into account? -- So I am not sure if having two handler 
methods really helps much with regard to type safety?


Just want to make this small comment for completeness. Let's hear what 
others think. Given that we both don't have a strong opinion but just a 
personal preference, we should be able to come to a conclusion quickly 
and get this KIP approved for 3.8 :)



-Matthias

On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:

Well I definitely don't feel super strongly about it, and more importantly,
I'm not a user. So I will happily defer to the preference of anyone who
will actually be using this feature. But  I'll explain my reasoning:

There *is* a relevant distinction between these two callbacks -- because
the passed-in record will have a different type depending on whether it was
a serialization exception or something else. Even if we combined them into
a single #handle method, users will still end up implementing two distinct
branches depending on whether it was a serialization exception or not,
since that determines the type of the ProducerRecord passed in.

Not to mention they'll need to cast it to a ProducerRecord
when we could have just passed it in as this type via a dedicated callback.
And note that because of the generics, they can't do an instanceof check to
make sure that the record type is ProducerRecord and will
have to suppress the "unchecked cast" warning.

So if we combined the two callbacks, their handler will look something like
this:

@SuppressWarnings("unchecked")
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
final ProducerRecord record,
final Exception exception) {
if (exception instanceof SerializationException) {
if (exception.origin().equals(KEY)) {
log.error("Failed to serialize key", exception);
} else {
log.error("Failed to serialize value", exception);
}

} else {
final ProducerRecord serializedRecord = (ProducerRecord) record;
log.error("Failed to produce record with serialized key={} and serialized
value={}",
serializedRecord.key(), serializedRecord.value());
}
return ProductionExceptionHandlerResponse.FAIL;
}

That seems like the most basic case, and it still haswith distinct logic
even if they ultimately handle exceptions the same way. And looking forward
to KIP-1034: Dead-letter queues, it seems all the more likely that the
actual handling response might be different depending on whether it's a
serialization exception or not: a serialized record can probably be
retried/sent to a DLQ, whereas a record that can't be serialized should not
(can't, really) be forwarded to a DLQ. So if they're going to have
completely different implementations depending on whether it's a
serialization exception, why not just give them two separate callbacks?

And that's all assuming the user is perfectly aware of the different
exception types and their implications for the type of the ProducerRecord.
Many people might just miss the existence of the
RecordSerializationException altogether --
there are already so many different exception types, ESPECIALLY when it
comes to the Producer. Not to mention they'll need to understand the
nuances of how the ProducerRecord type changes depending on the type of
exception that's passed in. And on top of all that, they'll need to know
that there is metadata stored in the RecordSerializationException regarding
the origin of the error. Whereas if we just passed in the
SerializationExceptionOrigin to a #handlerSerialization callback, well,
that's pretty impossible to miss.

That all just seems like a lot for most people to have to understand to
implement a ProductionExceptionHandler, which imo is not at all an advanced
feature and should be as straightforward and easy to use as possible.

Lastly -- I don't think it's quite fair to compare this to the
RecordDeserializationException. We have a dedicated handler that's just for
deserialization exceptions specifically, hence there's no worry about users
having to be aware of the different exception types they might have to deal
with in the DeserializtionExceptionHandler. Whereas serialization
exceptions are just a subset of what might get passed in to the
ProductionExceptionHandler...

Just explaining my reasoning -- in the end I leave it up to the KIP authors
and anyone who will actually be using this feature in their applications :)



On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax  wrote:


@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-08 Thread Sophie Blee-Goldman
Well I definitely don't feel super strongly about it, and more importantly,
I'm not a user. So I will happily defer to the preference of anyone who
will actually be using this feature. But  I'll explain my reasoning:

There *is* a relevant distinction between these two callbacks -- because
the passed-in record will have a different type depending on whether it was
a serialization exception or something else. Even if we combined them into
a single #handle method, users will still end up implementing two distinct
branches depending on whether it was a serialization exception or not,
since that determines the type of the ProducerRecord passed in.

Not to mention they'll need to cast it to a ProducerRecord
when we could have just passed it in as this type via a dedicated callback.
And note that because of the generics, they can't do an instanceof check to
make sure that the record type is ProducerRecord and will
have to suppress the "unchecked cast" warning.

So if we combined the two callbacks, their handler will look something like
this:

@SuppressWarnings("unchecked")
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
final ProducerRecord record,
final Exception exception) {
if (exception instanceof SerializationException) {
if (exception.origin().equals(KEY)) {
log.error("Failed to serialize key", exception);
} else {
log.error("Failed to serialize value", exception);
}

} else {
final ProducerRecord serializedRecord = (ProducerRecord) record;
log.error("Failed to produce record with serialized key={} and serialized
value={}",
serializedRecord.key(), serializedRecord.value());
}
return ProductionExceptionHandlerResponse.FAIL;
}

That seems like the most basic case, and it still haswith distinct logic
even if they ultimately handle exceptions the same way. And looking forward
to KIP-1034: Dead-letter queues, it seems all the more likely that the
actual handling response might be different depending on whether it's a
serialization exception or not: a serialized record can probably be
retried/sent to a DLQ, whereas a record that can't be serialized should not
(can't, really) be forwarded to a DLQ. So if they're going to have
completely different implementations depending on whether it's a
serialization exception, why not just give them two separate callbacks?

And that's all assuming the user is perfectly aware of the different
exception types and their implications for the type of the ProducerRecord.
Many people might just miss the existence of the
RecordSerializationException altogether --
there are already so many different exception types, ESPECIALLY when it
comes to the Producer. Not to mention they'll need to understand the
nuances of how the ProducerRecord type changes depending on the type of
exception that's passed in. And on top of all that, they'll need to know
that there is metadata stored in the RecordSerializationException regarding
the origin of the error. Whereas if we just passed in the
SerializationExceptionOrigin to a #handlerSerialization callback, well,
that's pretty impossible to miss.

That all just seems like a lot for most people to have to understand to
implement a ProductionExceptionHandler, which imo is not at all an advanced
feature and should be as straightforward and easy to use as possible.

Lastly -- I don't think it's quite fair to compare this to the
RecordDeserializationException. We have a dedicated handler that's just for
deserialization exceptions specifically, hence there's no worry about users
having to be aware of the different exception types they might have to deal
with in the DeserializtionExceptionHandler. Whereas serialization
exceptions are just a subset of what might get passed in to the
ProductionExceptionHandler...

Just explaining my reasoning -- in the end I leave it up to the KIP authors
and anyone who will actually be using this feature in their applications :)



On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax  wrote:

> @Loic, yes, what you describe is exactly what I had in mind.
>
>
>
> @Sophie, can you elaborate a little bit?
>
> > First of all, I agree that it makes sense to maintain the two separate
> > callbacks for the ProductionExceptionHandler, since one of them is
> > specifically for serialization exceptions while the other is used for
> > everything/anything else.
>
> What makes a serialization exception special compare to other errors
> that it's valuable to treat it differently? Why can we put "everything
> else" into a single bucket? By your train of though, should we not split
> out the "everything else" bucket into a different callback method for
> every different error? If no, why not, but only for serialization errors?
>
>  From what I believe to remember, historically, we added the
> ProductionExceptionHandler, and kinda just missed the serialization
> error case. And later, when we extended the handler we just could not
> re-use the existing callback as it was typed with `` and
> it would have been an 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Matthias J. Sax

@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else.


What makes a serialization exception special compare to other errors 
that it's valuable to treat it differently? Why can we put "everything 
else" into a single bucket? By your train of though, should we not split 
out the "everything else" bucket into a different callback method for 
every different error? If no, why not, but only for serialization errors?


From what I believe to remember, historically, we added the 
ProductionExceptionHandler, and kinda just missed the serialization 
error case. And later, when we extended the handler we just could not 
re-use the existing callback as it was typed with `` and 
it would have been an incompatible change; so it was rather a workaround 
to add the second method to then handler, but not really intended design?



It's of course only my personal opinion that I believe a single callback 
method is simpler/cleaner compared to sticking with two, and adding the 
new exception type to make it backward compatible seems worth it. It 
also kinda introduces the same patter we use elsewhere (cf KIP-1036) 
what I actually think is an argument for introducing 
`RercordSerializationExcetpion`, to unify user experience across the board.


Would be great to hear from others about this point. It's not that I 
strongly object to having two methods, and I would not block this KIP on 
this question.




-Matthias


On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote:

First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier 
wrote:


Hi Matthias,

To sum up with the ProductionExceptionHandler callback methods (106)
proposed changes.

A new method ProductionExceptionHandler#handle is added with the following
signature:


ProductionExceptionHandlerResponse handle(final ErrorHandlerContext

context, final ProducerRecord record, final Exception exception);

The ProducerRecord parameter has changed to accept a serialized or
non-serialized record.
Thus, the new ProductionExceptionHandler#handle method can handle both
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and
ProductionExceptionHandler#handleSerializationException methods are now
deprecated.
The old ProductionExceptionHandler#handle method gets a default
implementation, so users do not have to implement a deprecated method.

To handle backward compatibility, the new
ProductionExceptionHandler#handle method gets a default implementation.


default ProductionExceptionHandlerResponse handle(final

ErrorHandlerContext context, final ProducerRecord record, final
Exception exception) {

   if (exception instanceof RecordSerializationException) {
   this.handleSerializationException(record, exception.getCause());
   }

   return handle((ProducerRecord) record, exception);
}


The default implementation either invokes #handleSerializationException or
#handle depending on the type of the exception, thus users still relying on
deprecated ProductionExceptionHandler#handle
or ProductionExceptionHandler#handleSerializationException custom
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of
serialization exception:


public  void send(final String topic, final K key, final V value,

...) {

 try {
 keyBytes = keySerializer.serialize(topic, headers, key);
 ...
 } catch (final ClassCastException exception) {
   ...
 } catch (final Exception exception) {

 try {
 response = productionExceptionHandler.handle(context,

record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
exception));

 } catch (final Exception e) {
 ...
 }
 }
}


To wrap the origin serialization exception and determine whether it comes
from the key or the value, a new exception class is created:


public class RecordSerializationException extends SerializationException

{

 public enum SerializationExceptionOrigin {
 KEY,
 VALUE
 }

 public 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Sophie Blee-Goldman
First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier 
wrote:

> Hi Matthias,
>
> To sum up with the ProductionExceptionHandler callback methods (106)
> proposed changes.
>
> A new method ProductionExceptionHandler#handle is added with the following
> signature:
>
> > ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
> context, final ProducerRecord record, final Exception exception);
>
> The ProducerRecord parameter has changed to accept a serialized or
> non-serialized record.
> Thus, the new ProductionExceptionHandler#handle method can handle both
> production exception and serialization exception.
>
> Both old ProductionExceptionHandler#handle and
> ProductionExceptionHandler#handleSerializationException methods are now
> deprecated.
> The old ProductionExceptionHandler#handle method gets a default
> implementation, so users do not have to implement a deprecated method.
>
> To handle backward compatibility, the new
> ProductionExceptionHandler#handle method gets a default implementation.
>
> > default ProductionExceptionHandlerResponse handle(final
> ErrorHandlerContext context, final ProducerRecord record, final
> Exception exception) {
> >   if (exception instanceof RecordSerializationException) {
> >   this.handleSerializationException(record, exception.getCause());
> >   }
> >
> >   return handle((ProducerRecord) record, exception);
> > }
>
> The default implementation either invokes #handleSerializationException or
> #handle depending on the type of the exception, thus users still relying on
> deprecated ProductionExceptionHandler#handle
> or ProductionExceptionHandler#handleSerializationException custom
> implementations won't break.
>
> The new ProductionExceptionHandler#handle method is now invoked in case of
> serialization exception:
>
> > public  void send(final String topic, final K key, final V value,
> ...) {
> > try {
> > keyBytes = keySerializer.serialize(topic, headers, key);
> > ...
> > } catch (final ClassCastException exception) {
> >   ...
> > } catch (final Exception exception) {
> >
> > try {
> > response = productionExceptionHandler.handle(context,
> record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
> exception));
> > } catch (final Exception e) {
> > ...
> > }
> > }
> > }
>
> To wrap the origin serialization exception and determine whether it comes
> from the key or the value, a new exception class is created:
>
> > public class RecordSerializationException extends SerializationException
> {
> > public enum SerializationExceptionOrigin {
> > KEY,
> > VALUE
> > }
> >
> > public RecordSerializationException(SerializationExceptionOrigin
> origin, final Throwable cause);
> > }
>
> Hope it all makes sense,
> Loïc
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Matthias J. Sax
What about (106) to unify both exiting callback methods of 
`ProductionExceptionHandler` into a single one, instead of adding two 
new ones?


Damien's last reply about it was:


I will think about unifying, I do agree it would be cleaner.


There was not follow up on this question, and the KIP right now still 
proposes to add two new methods, which I believe we could (should?) 
unify to:


default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
  final ProducerRecord record,

  final Exception exception) {


Ie, we drop the generics `` on `ProducerRecord` what 
allows you to also pass in a non-serialized ProducerRecord of any type 
for the serialization error case.


Btw: wondering if we also want to pass in a flag/enum about key vs value 
serialization error similar to what was proposed in KIP-1036? The only 
"oddity" would be, that we call the handler other error cases, too, not 
just for serialization exceptions. But we wculd tackle this by 
introducing a new class `RecordSerializationException` which would 
include the flag and would ensure that KS hands this exception into the 
handler. This would keep the handler interface/method itself clean.



-Matthias




On 5/3/24 2:15 AM, Loic Greffier wrote:

Hi Bruno,

Good catch, KIP has been updated accordingly.

Regards,
Loïc


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Loic Greffier
Hi Bruno, 

Good catch, KIP has been updated accordingly.

Regards,
Loïc


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Bruno Cadonna

Hi,

the KIP looks great!

public static final String PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG = 
"process.exception.handler".


needs to be changed to

public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = 
"processing.exception.handler".


The name of the constant has been already corrected in the code block 
but the actual name of the config (i.e., the content of the constant) 
has not been changed yet.



Best,
Bruno


On 5/3/24 10:35 AM, Sebastien Viale wrote:

Hi,
So, we all agree to revert to the regular Headers interface in 
ErrorHandlerContext.
We will update the KIP accordingly.
@Sophie => Yes, this is the last remaining question, and it has been open for 
voting since last week.
Thanks

Sébastien

De : Andrew Schofield 
Envoyé : vendredi 3 mai 2024 06:44
À : dev@kafka.apache.org 
Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1033: Add Kafka Streams 
exception handler for exceptions occuring during processing

Warning This might be a fraudulent message! When clicking REPLY, your answers 
will NOT go to the sender (andrew_schofi...@live.com). Instead, replies will be 
sent to dev@kafka.apache.org. Be cautious!

Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew


This email was screened for spam and malicious content but exercise caution 
anyway.



On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:

I tend to agree that we should just return a pure Headers instead of
introducing a new class/interface to protect overwriting them. I think a
pretty good case has been made already so I won't add onto it, just wanted
to voice my support.

Is that the only remaining question on this KIP? Might be ok to move to a
vote now?

On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:


Hi all, thanks Damien for the KIP!

After looking into the KIP and comments, my only concern is aligned with
one of Matthias comments, around the ImmutableHeaders introduction, with
the motivation not being clear enough. The existing handlers already expose
the headers (indirectly). Ex.
ProductionExceptionHandler.handleSerializationException provides the
ProducerRecord as an argument, so they are already exposed in those
callbacks through record.headers(). Is there a reason to think that it
would be a problem to expose the headers in the
new ProcessingExceptionHandler, but that it's not a problem for the
existing handler?

If there is no real concern about the KS engine requiring those headers, it
feels hard to mentally justify the complexity we transfer to the user by
exposing a new concept into the callbacks to represent the headers. In the
end, it strays aways from the simple/consistent representation of Headers
used all over. Even if eventually the KS engine needs to use the headers
after the callbacks with certainty that they were not altered, still feels
like it's something we could attempt to solve internally, without having to
transfer "new concepts" into the user (ex. the deep-copy as it was
suggested, seems like the kind of trade-off that would maybe be acceptable
here to gain simplicity and consistency among the handlers with a single
existing representation of Headers).

Best!

Lianet



On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:


Thanks for the update.

I am wondering if we should use `ReadOnlyHeaders` instead of
`ImmutableHeaders` as interface name?

Also, the returned `Header` interface is technically not immutable
either, because `Header#key()` returns a mutable byte-array... Would we
need a `ReadOnlyHeader` interface?

If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
of `Headers` but it would rather be a standalone interface, and a
wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
immutable type instead of `byte[]` for the value()?

An alternative would be to deep-copy the value byte-array what would not
be free, but given that we are talking about exception handling, it
would not be on the hot code path, and thus might be acceptable?


The above seems to increase the complexity significantly though. Hence,
I have seconds thoughts on the immutability question:

Do we really need to worry about mutability after all, because in the
end, KS runtime won't read the Headers instance after the handler was
called, and if a user modifies the passed in headers, there won't be any
actual damage (ie, no side effects)? For this case, it might even be ok
to also not add `ImmutableHeaders` to begin with?



Sorry for the forth and back (yes, forth and back, because back and
forth does not make 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-02 Thread Andrew Schofield
Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew

> On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:
> 
> I tend to agree that we should just return a pure Headers instead of
> introducing a new class/interface to protect overwriting them. I think a
> pretty good case has been made already so I won't add onto it, just wanted
> to voice my support.
> 
> Is that the only remaining question on this KIP? Might be ok to move to a
> vote now?
> 
> On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:
> 
>> Hi all, thanks Damien for the KIP!
>> 
>> After looking into the KIP and comments, my only concern is aligned with
>> one of Matthias comments, around the ImmutableHeaders introduction, with
>> the motivation not being clear enough. The existing handlers already expose
>> the headers (indirectly). Ex.
>> ProductionExceptionHandler.handleSerializationException provides the
>> ProducerRecord as an argument, so they are already exposed in those
>> callbacks through record.headers(). Is there a reason to think that it
>> would be a problem to expose the headers in the
>> new ProcessingExceptionHandler, but that it's not a problem for the
>> existing handler?
>> 
>> If there is no real concern about the KS engine requiring those headers, it
>> feels hard to mentally justify the complexity we transfer to the user by
>> exposing a new concept into the callbacks to represent the headers. In the
>> end, it strays aways from the simple/consistent representation of Headers
>> used all over. Even if eventually the KS engine needs to use the headers
>> after the callbacks with certainty that they were not altered, still feels
>> like it's something we could attempt to solve internally, without having to
>> transfer "new concepts" into the user (ex. the deep-copy as it was
>> suggested, seems like the kind of trade-off that would maybe be acceptable
>> here to gain simplicity and consistency among the handlers with a single
>> existing representation of Headers).
>> 
>> Best!
>> 
>> Lianet
>> 
>> 
>> 
>> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:
>> 
>>> Thanks for the update.
>>> 
>>> I am wondering if we should use `ReadOnlyHeaders` instead of
>>> `ImmutableHeaders` as interface name?
>>> 
>>> Also, the returned `Header` interface is technically not immutable
>>> either, because `Header#key()` returns a mutable byte-array... Would we
>>> need a `ReadOnlyHeader` interface?
>>> 
>>> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
>>> of `Headers` but it would rather be a standalone interface, and a
>>> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
>>> immutable type instead of `byte[]` for the value()?
>>> 
>>> An alternative would be to deep-copy the value byte-array what would not
>>> be free, but given that we are talking about exception handling, it
>>> would not be on the hot code path, and thus might be acceptable?
>>> 
>>> 
>>> The above seems to increase the complexity significantly though. Hence,
>>> I have seconds thoughts on the immutability question:
>>> 
>>> Do we really need to worry about mutability after all, because in the
>>> end, KS runtime won't read the Headers instance after the handler was
>>> called, and if a user modifies the passed in headers, there won't be any
>>> actual damage (ie, no side effects)? For this case, it might even be ok
>>> to also not add `ImmutableHeaders` to begin with?
>>> 
>>> 
>>> 
>>> Sorry for the forth and back (yes, forth and back, because back and
>>> forth does not make sense -- it's not logical -- just trying to fix
>>> English :D) as I did bring up the immutability question in the first
>>> place...
>>> 
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 4/25/24 5:56 AM, Loic Greffier wrote:
 Hi Matthias,
 
 I have updated the KIP regarding points 103 and 108.
 
 103.
 I have suggested a new `ImmutableHeaders` interface to deal with the
 immutability concern of the headers, which is basically the `Headers`
 interface without the write accesses.
 
 public interface ImmutableHeaders {
 Header lastHeader(String key);
 Iterable headers(String key);
 Header[] toArray();
 }
 
 The `Headers` interface can be updated accordingly:
 
 public interface Headers extends ImmutableHeaders, Iterable {
 //…
 }
 
 Loïc
>>> 
>> 



Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-02 Thread Sophie Blee-Goldman
I tend to agree that we should just return a pure Headers instead of
introducing a new class/interface to protect overwriting them. I think a
pretty good case has been made already so I won't add onto it, just wanted
to voice my support.

Is that the only remaining question on this KIP? Might be ok to move to a
vote now?

On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:

> Hi all, thanks Damien for the KIP!
>
> After looking into the KIP and comments, my only concern is aligned with
> one of Matthias comments, around the ImmutableHeaders introduction, with
> the motivation not being clear enough. The existing handlers already expose
> the headers (indirectly). Ex.
> ProductionExceptionHandler.handleSerializationException provides the
> ProducerRecord as an argument, so they are already exposed in those
> callbacks through record.headers(). Is there a reason to think that it
> would be a problem to expose the headers in the
> new ProcessingExceptionHandler, but that it's not a problem for the
> existing handler?
>
> If there is no real concern about the KS engine requiring those headers, it
> feels hard to mentally justify the complexity we transfer to the user by
> exposing a new concept into the callbacks to represent the headers. In the
> end, it strays aways from the simple/consistent representation of Headers
> used all over. Even if eventually the KS engine needs to use the headers
> after the callbacks with certainty that they were not altered, still feels
> like it's something we could attempt to solve internally, without having to
> transfer "new concepts" into the user (ex. the deep-copy as it was
> suggested, seems like the kind of trade-off that would maybe be acceptable
> here to gain simplicity and consistency among the handlers with a single
> existing representation of Headers).
>
> Best!
>
> Lianet
>
>
>
> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:
>
> > Thanks for the update.
> >
> > I am wondering if we should use `ReadOnlyHeaders` instead of
> > `ImmutableHeaders` as interface name?
> >
> > Also, the returned `Header` interface is technically not immutable
> > either, because `Header#key()` returns a mutable byte-array... Would we
> > need a `ReadOnlyHeader` interface?
> >
> > If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
> > of `Headers` but it would rather be a standalone interface, and a
> > wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
> > immutable type instead of `byte[]` for the value()?
> >
> > An alternative would be to deep-copy the value byte-array what would not
> > be free, but given that we are talking about exception handling, it
> > would not be on the hot code path, and thus might be acceptable?
> >
> >
> > The above seems to increase the complexity significantly though. Hence,
> > I have seconds thoughts on the immutability question:
> >
> > Do we really need to worry about mutability after all, because in the
> > end, KS runtime won't read the Headers instance after the handler was
> > called, and if a user modifies the passed in headers, there won't be any
> > actual damage (ie, no side effects)? For this case, it might even be ok
> > to also not add `ImmutableHeaders` to begin with?
> >
> >
> >
> > Sorry for the forth and back (yes, forth and back, because back and
> > forth does not make sense -- it's not logical -- just trying to fix
> > English :D) as I did bring up the immutability question in the first
> > place...
> >
> >
> >
> > -Matthias
> >
> > On 4/25/24 5:56 AM, Loic Greffier wrote:
> > > Hi Matthias,
> > >
> > > I have updated the KIP regarding points 103 and 108.
> > >
> > > 103.
> > > I have suggested a new `ImmutableHeaders` interface to deal with the
> > > immutability concern of the headers, which is basically the `Headers`
> > > interface without the write accesses.
> > >
> > > public interface ImmutableHeaders {
> > >  Header lastHeader(String key);
> > >  Iterable headers(String key);
> > >  Header[] toArray();
> > > }
> > >
> > > The `Headers` interface can be updated accordingly:
> > >
> > > public interface Headers extends ImmutableHeaders, Iterable {
> > >  //…
> > > }
> > >
> > > Loïc
> >
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-01 Thread Lianet M.
Hi all, thanks Damien for the KIP!

After looking into the KIP and comments, my only concern is aligned with
one of Matthias comments, around the ImmutableHeaders introduction, with
the motivation not being clear enough. The existing handlers already expose
the headers (indirectly). Ex.
ProductionExceptionHandler.handleSerializationException provides the
ProducerRecord as an argument, so they are already exposed in those
callbacks through record.headers(). Is there a reason to think that it
would be a problem to expose the headers in the
new ProcessingExceptionHandler, but that it's not a problem for the
existing handler?

If there is no real concern about the KS engine requiring those headers, it
feels hard to mentally justify the complexity we transfer to the user by
exposing a new concept into the callbacks to represent the headers. In the
end, it strays aways from the simple/consistent representation of Headers
used all over. Even if eventually the KS engine needs to use the headers
after the callbacks with certainty that they were not altered, still feels
like it's something we could attempt to solve internally, without having to
transfer "new concepts" into the user (ex. the deep-copy as it was
suggested, seems like the kind of trade-off that would maybe be acceptable
here to gain simplicity and consistency among the handlers with a single
existing representation of Headers).

Best!

Lianet



On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:

> Thanks for the update.
>
> I am wondering if we should use `ReadOnlyHeaders` instead of
> `ImmutableHeaders` as interface name?
>
> Also, the returned `Header` interface is technically not immutable
> either, because `Header#key()` returns a mutable byte-array... Would we
> need a `ReadOnlyHeader` interface?
>
> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
> of `Headers` but it would rather be a standalone interface, and a
> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
> immutable type instead of `byte[]` for the value()?
>
> An alternative would be to deep-copy the value byte-array what would not
> be free, but given that we are talking about exception handling, it
> would not be on the hot code path, and thus might be acceptable?
>
>
> The above seems to increase the complexity significantly though. Hence,
> I have seconds thoughts on the immutability question:
>
> Do we really need to worry about mutability after all, because in the
> end, KS runtime won't read the Headers instance after the handler was
> called, and if a user modifies the passed in headers, there won't be any
> actual damage (ie, no side effects)? For this case, it might even be ok
> to also not add `ImmutableHeaders` to begin with?
>
>
>
> Sorry for the forth and back (yes, forth and back, because back and
> forth does not make sense -- it's not logical -- just trying to fix
> English :D) as I did bring up the immutability question in the first
> place...
>
>
>
> -Matthias
>
> On 4/25/24 5:56 AM, Loic Greffier wrote:
> > Hi Matthias,
> >
> > I have updated the KIP regarding points 103 and 108.
> >
> > 103.
> > I have suggested a new `ImmutableHeaders` interface to deal with the
> > immutability concern of the headers, which is basically the `Headers`
> > interface without the write accesses.
> >
> > public interface ImmutableHeaders {
> >  Header lastHeader(String key);
> >  Iterable headers(String key);
> >  Header[] toArray();
> > }
> >
> > The `Headers` interface can be updated accordingly:
> >
> > public interface Headers extends ImmutableHeaders, Iterable {
> >  //…
> > }
> >
> > Loïc
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-30 Thread Matthias J. Sax

Thanks for the update.

I am wondering if we should use `ReadOnlyHeaders` instead of 
`ImmutableHeaders` as interface name?


Also, the returned `Header` interface is technically not immutable 
either, because `Header#key()` returns a mutable byte-array... Would we 
need a `ReadOnlyHeader` interface?


If yes, it seems that `ReadOnlyHeaders` should not be a super-interface 
of `Headers` but it would rather be a standalone interface, and a 
wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some 
immutable type instead of `byte[]` for the value()?


An alternative would be to deep-copy the value byte-array what would not 
be free, but given that we are talking about exception handling, it 
would not be on the hot code path, and thus might be acceptable?



The above seems to increase the complexity significantly though. Hence, 
I have seconds thoughts on the immutability question:


Do we really need to worry about mutability after all, because in the 
end, KS runtime won't read the Headers instance after the handler was 
called, and if a user modifies the passed in headers, there won't be any 
actual damage (ie, no side effects)? For this case, it might even be ok 
to also not add `ImmutableHeaders` to begin with?




Sorry for the forth and back (yes, forth and back, because back and 
forth does not make sense -- it's not logical -- just trying to fix 
English :D) as I did bring up the immutability question in the first 
place...




-Matthias

On 4/25/24 5:56 AM, Loic Greffier wrote:

Hi Matthias,

I have updated the KIP regarding points 103 and 108.

103.
I have suggested a new `ImmutableHeaders` interface to deal with the
immutability concern of the headers, which is basically the `Headers`
interface without the write accesses.

public interface ImmutableHeaders {
 Header lastHeader(String key);
 Iterable headers(String key);
 Header[] toArray();
}

The `Headers` interface can be updated accordingly:

public interface Headers extends ImmutableHeaders, Iterable {
 //…
}

Loïc


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-25 Thread Loic Greffier
Hi Matthias,

I have updated the KIP regarding points 103 and 108.

103.
I have suggested a new `ImmutableHeaders` interface to deal with the
immutability concern of the headers, which is basically the `Headers`
interface without the write accesses.

public interface ImmutableHeaders {
Header lastHeader(String key);
Iterable headers(String key);
Header[] toArray();
}

The `Headers` interface can be updated accordingly:

public interface Headers extends ImmutableHeaders, Iterable {
//…
}

Loïc


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-23 Thread Damien Gasparina
isting handlers in a separate KIP, what do
> >>>>> you think?
> >>>>> Maybe I am overthinking this part and the ProcessingContext would be 
> >>>>> fine.
> >>>>>
> >>>>> 4. Good point regarding the dropped-record metric, as it is used by
> >>>>> the other handlers, I do think it makes sense to leverage it instead
> >>>>> of creating a new metric.
> >>>>> I will update the KIP to update the dropped-record-metric.
> >>>>>
> >>>>> 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> >>>>> the gaps in a future KIP.
> >>>>>
> >>>>> Cheers,
> >>>>> Damien
> >>>>>
> >>>>>
> >>>>> On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:
> >>>>>>
> >>>>>> Hi Matthias,
> >>>>>>
> >>>>>>
> >>>>>> 1.a
> >>>>>> With processor node ID, I mean the ID that is exposed in the tags of
> >>>>>> processor node metrics. That ID cannot be internal since it is exposed
> >>>>>> in metrics. I think the processor name and the processor node ID is the
> >>>>>> same thing. I followed how the processor node ID is set in metrics and 
> >>>>>> I
> >>>>>> ended up in addProcessor(name, ...).
> >>>>>>
> >>>>>>
> >>>>>> 1.b
> >>>>>> Regarding ProcessingContext, I also thought about a separate class to
> >>>>>> pass-in context information into the handler, but then I dismissed the
> >>>>>> idea because I thought I was overthinking it. Apparently, I was not
> >>>>>> overthinking it if you also had the same idea. So let's consider a
> >>>>>> separate class.
> >>>>>>
> >>>>>>
> >>>>>> 4.
> >>>>>> Regarding the metric, thanks for pointing to the dropped-record metric,
> >>>>>> Matthias. The dropped-record metric is used with the deserialization
> >>>>>> handler and the production handler. So, it would make sense to also use
> >>>>>> it for this handler. However, the dropped-record metric only records
> >>>>>> records that are skipped by the handler and not the number of calls to
> >>>>>> the handler. But that difference is probably irrelevant since in case 
> >>>>>> of
> >>>>>> FAIL, the metric will be reset anyways since the stream thread will be
> >>>>>> restarted. In conclusion, I think the dropped-record metric in
> >>>>>> combination with a warn log message might be the better choice to
> >>>>>> introducing a new metric.
> >>>>>>
> >>>>>>
> >>>>>> 8.
> >>>>>> Regarding the DSL, I think we should close possible gaps in a separate
> >>>>> KIP.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 4/11/24 12:06 AM, Matthias J. Sax wrote:
> >>>>>>> Thanks for the KIP. Great discussion.
> >>>>>>>
> >>>>>>> I am not sure if I understand the proposal from Bruno to hand in the
> >>>>>>> processor node id? Isn't this internal (could not even find it
> >>>>> quickly).
> >>>>>>> We do have a processor name, right? Or do I mix up something?
> >>>>>>>
> >>>>>>> Another question is about `ProcessingContext` -- it contains a lot of
> >>>>>>> (potentially irrelevant?) metadata. We should think carefully about
> >>>>> what
> >>>>>>> we want to pass in and what not -- removing stuff is hard, but adding
> >>>>>>> stuff is easy. It's always an option to create a new interface that
> >>>>> only
> >>>>>>> exposes stuff we find useful, and allows us to evolve this interface
> >>>>>>> independent of others. Re-using an existing interface always has the
> >>>>>>> danger to introduce an undesired coupling that could bite us in the
> >>>>>>> future. --

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Matthias J. Sax
Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:


Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of
processor node metrics. That ID cannot be internal since it is exposed
in metrics. I think the processor name and the processor node ID is the
same thing. I followed how the processor node ID is set in metrics and I
ended up in addProcessor(name, ...).


1.b
Regarding ProcessingContext, I also thought about a separate class to
pass-in context information into the handler, but then I dismissed the
idea because I thought I was overthinking it. Apparently, I was not
overthinking it if you also had the same idea. So let's consider a
separate class.


4.
Regarding the metric, thanks for pointing to the dropped-record metric,
Matthias. The dropped-record metric is used with the deserialization
handler and the production handler. So, it would make sense to also use
it for this handler. However, the dropped-record metric only records
records that are skipped by the handler and not the number of calls to
the handler. But that difference is probably irrelevant since in case of
FAIL, the metric will be reset anyways since the stream thread will be
restarted. In conclusion, I think the dropped-record metric in
combination with a warn log message might be the better choice to
introducing a new metric.


8.
Regarding the DSL, I think we should close possible gaps in a separate

KIP.



Best,
Bruno

On 4/11/24 12:06 AM, Matthias J. Sax wrote:

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the
processor node id? Isn't this internal (could not even find it

quickly).

We do have a processor name, right? Or do I mix up something?

Another question is about `ProcessingContext` -- it contains a lot of
(potentially irrelevant?) metadata. We should think carefully about

what

we want to pass in and what not -- removing stuff is hard, but adding
stuff is easy. It's always an option to create a new interface that

only

exposes stuff we find useful, and allows us to evolve this interface
independent of others. Re-using an existing interface always has the
danger to introduce an undesired coupling that could bite us in the
future. -- It make total sense to pass in `RecordMetadata`, but
`ProcessingContext` (even if already limited compared to
`ProcessorContext`) still seems to be too broad? For example, there is
`getStateStore()` and `schedule()` methods which I think we should not
expose.

The other interesting question is about "what record gets passed in".
For the PAPI, passing in the Processor's input record make a lot of
sense. However, for DSL operators, I am not 100% sure? The DSL often
uses internal types not exposed to the user, and thus I am not sure if
users could write useful code for this case? -- In general, I still
agree that the handler should be implement with a try-catch around
`Processor.process()` but it might not be too useful for DSL processor.
Hence, I am wondering if we need to so something more in the DSL? I
don't have a concrete proposal (a few high level ideas only) and if we
don't do anything special for the DSL I am ok with moving forward with
this KIP as-is, but we should be aware of potential limitations for DSL
users. We can always do a follow up KIP to close gaps when we

understand

the impact better -- covering the DSL would also expand the scope of
this KIP significantly...

About the metric: just to double check. Do we think it's worth to add a
new metric? Or could we re-use the existing "dropped record metric"?



-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand





*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:


https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
<
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152




I see that we do not need to pass into the the handler a

Record
byte[]> just because we do that for the

DeserializationExceptionHa

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Damien Gasparina
;>> I am not sure if I understand the proposal from Bruno to hand in the
> >>>>> processor node id? Isn't this internal (could not even find it
> >>> quickly).
> >>>>> We do have a processor name, right? Or do I mix up something?
> >>>>>
> >>>>> Another question is about `ProcessingContext` -- it contains a lot of
> >>>>> (potentially irrelevant?) metadata. We should think carefully about
> >>> what
> >>>>> we want to pass in and what not -- removing stuff is hard, but adding
> >>>>> stuff is easy. It's always an option to create a new interface that
> >>> only
> >>>>> exposes stuff we find useful, and allows us to evolve this interface
> >>>>> independent of others. Re-using an existing interface always has the
> >>>>> danger to introduce an undesired coupling that could bite us in the
> >>>>> future. -- It make total sense to pass in `RecordMetadata`, but
> >>>>> `ProcessingContext` (even if already limited compared to
> >>>>> `ProcessorContext`) still seems to be too broad? For example, there is
> >>>>> `getStateStore()` and `schedule()` methods which I think we should not
> >>>>> expose.
> >>>>>
> >>>>> The other interesting question is about "what record gets passed in".
> >>>>> For the PAPI, passing in the Processor's input record make a lot of
> >>>>> sense. However, for DSL operators, I am not 100% sure? The DSL often
> >>>>> uses internal types not exposed to the user, and thus I am not sure if
> >>>>> users could write useful code for this case? -- In general, I still
> >>>>> agree that the handler should be implement with a try-catch around
> >>>>> `Processor.process()` but it might not be too useful for DSL processor.
> >>>>> Hence, I am wondering if we need to so something more in the DSL? I
> >>>>> don't have a concrete proposal (a few high level ideas only) and if we
> >>>>> don't do anything special for the DSL I am ok with moving forward with
> >>>>> this KIP as-is, but we should be aware of potential limitations for DSL
> >>>>> users. We can always do a follow up KIP to close gaps when we
> >>> understand
> >>>>> the impact better -- covering the DSL would also expand the scope of
> >>>>> this KIP significantly...
> >>>>>
> >>>>> About the metric: just to double check. Do we think it's worth to add a
> >>>>> new metric? Or could we re-use the existing "dropped record metric"?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 4/10/24 5:11 AM, Sebastien Viale wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> You are right, it will simplify types.
> >>>>>>
> >>>>>> We update the KIP
> >>>>>>
> >>>>>> regards
> >>>>>>
> >>>>>> Sébastien *VIALE***
> >>>>>>
> >>>>>> *MICHELIN GROUP* - InfORMATION Technology
> >>>>>> *Technical Expert Kafka*
> >>>>>>
> >>>>>>Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> >>>>>>
> >>>>>>
> >>> 
> >>>>>> *De :* Bruno Cadonna 
> >>>>>> *Envoyé :* mercredi 10 avril 2024 10:38
> >>>>>> *À :* dev@kafka.apache.org 
> >>>>>> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >>>>>> handler for exceptions occuring during processing
> >>>>>> Warning External sender Do not click on any links or open any
> >>>>>> attachments unless you trust the sender and know the content is safe.
> >>>>>>
> >>>>>> Hi Loïc, Damien, and Sébastien,
> >>>>>>
> >>>>>> Great that we are converging!
> >>>>>>
> >>>>>>
> >>>>>> 3.
> >>>>>> Damien and Loïc, I think in your examples the handler will receive
> >>>>>> Record because an Record is passe

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Bruno Cadonna
 name and the processor node ID is the
same thing. I followed how the processor node ID is set in metrics and I
ended up in addProcessor(name, ...).


1.b
Regarding ProcessingContext, I also thought about a separate class to
pass-in context information into the handler, but then I dismissed the
idea because I thought I was overthinking it. Apparently, I was not
overthinking it if you also had the same idea. So let's consider a
separate class.


4.
Regarding the metric, thanks for pointing to the dropped-record metric,
Matthias. The dropped-record metric is used with the deserialization
handler and the production handler. So, it would make sense to also use
it for this handler. However, the dropped-record metric only records
records that are skipped by the handler and not the number of calls to
the handler. But that difference is probably irrelevant since in case of
FAIL, the metric will be reset anyways since the stream thread will be
restarted. In conclusion, I think the dropped-record metric in
combination with a warn log message might be the better choice to
introducing a new metric.


8.
Regarding the DSL, I think we should close possible gaps in a separate

KIP.



Best,
Bruno

On 4/11/24 12:06 AM, Matthias J. Sax wrote:

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the
processor node id? Isn't this internal (could not even find it

quickly).

We do have a processor name, right? Or do I mix up something?

Another question is about `ProcessingContext` -- it contains a lot of
(potentially irrelevant?) metadata. We should think carefully about

what

we want to pass in and what not -- removing stuff is hard, but adding
stuff is easy. It's always an option to create a new interface that

only

exposes stuff we find useful, and allows us to evolve this interface
independent of others. Re-using an existing interface always has the
danger to introduce an undesired coupling that could bite us in the
future. -- It make total sense to pass in `RecordMetadata`, but
`ProcessingContext` (even if already limited compared to
`ProcessorContext`) still seems to be too broad? For example, there is
`getStateStore()` and `schedule()` methods which I think we should not
expose.

The other interesting question is about "what record gets passed in".
For the PAPI, passing in the Processor's input record make a lot of
sense. However, for DSL operators, I am not 100% sure? The DSL often
uses internal types not exposed to the user, and thus I am not sure if
users could write useful code for this case? -- In general, I still
agree that the handler should be implement with a try-catch around
`Processor.process()` but it might not be too useful for DSL processor.
Hence, I am wondering if we need to so something more in the DSL? I
don't have a concrete proposal (a few high level ideas only) and if we
don't do anything special for the DSL I am ok with moving forward with
this KIP as-is, but we should be aware of potential limitations for DSL
users. We can always do a follow up KIP to close gaps when we

understand

the impact better -- covering the DSL would also expand the scope of
this KIP significantly...

About the metric: just to double check. Do we think it's worth to add a
new metric? Or could we re-use the existing "dropped record metric"?



-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand





*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:


https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
<
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152




I see that we do not need to pass into the the handler a

Record
byte[]> just because we do that for the

DeserializationExceptionHandler

and the ProductionExceptionHandler. When those two handlers are

called,

the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record
?>

for the record that is passed to t

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-21 Thread Damien Gasparina
t; > > 4.
> > > Regarding the metric, thanks for pointing to the dropped-record metric,
> > > Matthias. The dropped-record metric is used with the deserialization
> > > handler and the production handler. So, it would make sense to also use
> > > it for this handler. However, the dropped-record metric only records
> > > records that are skipped by the handler and not the number of calls to
> > > the handler. But that difference is probably irrelevant since in case of
> > > FAIL, the metric will be reset anyways since the stream thread will be
> > > restarted. In conclusion, I think the dropped-record metric in
> > > combination with a warn log message might be the better choice to
> > > introducing a new metric.
> > >
> > >
> > > 8.
> > > Regarding the DSL, I think we should close possible gaps in a separate
> > KIP.
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > > On 4/11/24 12:06 AM, Matthias J. Sax wrote:
> > > > Thanks for the KIP. Great discussion.
> > > >
> > > > I am not sure if I understand the proposal from Bruno to hand in the
> > > > processor node id? Isn't this internal (could not even find it
> > quickly).
> > > > We do have a processor name, right? Or do I mix up something?
> > > >
> > > > Another question is about `ProcessingContext` -- it contains a lot of
> > > > (potentially irrelevant?) metadata. We should think carefully about
> > what
> > > > we want to pass in and what not -- removing stuff is hard, but adding
> > > > stuff is easy. It's always an option to create a new interface that
> > only
> > > > exposes stuff we find useful, and allows us to evolve this interface
> > > > independent of others. Re-using an existing interface always has the
> > > > danger to introduce an undesired coupling that could bite us in the
> > > > future. -- It make total sense to pass in `RecordMetadata`, but
> > > > `ProcessingContext` (even if already limited compared to
> > > > `ProcessorContext`) still seems to be too broad? For example, there is
> > > > `getStateStore()` and `schedule()` methods which I think we should not
> > > > expose.
> > > >
> > > > The other interesting question is about "what record gets passed in".
> > > > For the PAPI, passing in the Processor's input record make a lot of
> > > > sense. However, for DSL operators, I am not 100% sure? The DSL often
> > > > uses internal types not exposed to the user, and thus I am not sure if
> > > > users could write useful code for this case? -- In general, I still
> > > > agree that the handler should be implement with a try-catch around
> > > > `Processor.process()` but it might not be too useful for DSL processor.
> > > > Hence, I am wondering if we need to so something more in the DSL? I
> > > > don't have a concrete proposal (a few high level ideas only) and if we
> > > > don't do anything special for the DSL I am ok with moving forward with
> > > > this KIP as-is, but we should be aware of potential limitations for DSL
> > > > users. We can always do a follow up KIP to close gaps when we
> > understand
> > > > the impact better -- covering the DSL would also expand the scope of
> > > > this KIP significantly...
> > > >
> > > > About the metric: just to double check. Do we think it's worth to add a
> > > > new metric? Or could we re-use the existing "dropped record metric"?
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 4/10/24 5:11 AM, Sebastien Viale wrote:
> > > >> Hi,
> > > >>
> > > >> You are right, it will simplify types.
> > > >>
> > > >> We update the KIP
> > > >>
> > > >> regards
> > > >>
> > > >> Sébastien *VIALE***
> > > >>
> > > >> *MICHELIN GROUP* - InfORMATION Technology
> > > >> *Technical Expert Kafka*
> > > >>
> > > >>   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> > > >>
> > > >>
> > 
> > > >> *De :* Bruno Cadonna 
> > > >> *Envoyé :* mercredi 10 avril 2024 10:38
> > > >> *À :* dev@k

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-12 Thread Sophie Blee-Goldman
 a lot of
> > > (potentially irrelevant?) metadata. We should think carefully about
> what
> > > we want to pass in and what not -- removing stuff is hard, but adding
> > > stuff is easy. It's always an option to create a new interface that
> only
> > > exposes stuff we find useful, and allows us to evolve this interface
> > > independent of others. Re-using an existing interface always has the
> > > danger to introduce an undesired coupling that could bite us in the
> > > future. -- It make total sense to pass in `RecordMetadata`, but
> > > `ProcessingContext` (even if already limited compared to
> > > `ProcessorContext`) still seems to be too broad? For example, there is
> > > `getStateStore()` and `schedule()` methods which I think we should not
> > > expose.
> > >
> > > The other interesting question is about "what record gets passed in".
> > > For the PAPI, passing in the Processor's input record make a lot of
> > > sense. However, for DSL operators, I am not 100% sure? The DSL often
> > > uses internal types not exposed to the user, and thus I am not sure if
> > > users could write useful code for this case? -- In general, I still
> > > agree that the handler should be implement with a try-catch around
> > > `Processor.process()` but it might not be too useful for DSL processor.
> > > Hence, I am wondering if we need to so something more in the DSL? I
> > > don't have a concrete proposal (a few high level ideas only) and if we
> > > don't do anything special for the DSL I am ok with moving forward with
> > > this KIP as-is, but we should be aware of potential limitations for DSL
> > > users. We can always do a follow up KIP to close gaps when we
> understand
> > > the impact better -- covering the DSL would also expand the scope of
> > > this KIP significantly...
> > >
> > > About the metric: just to double check. Do we think it's worth to add a
> > > new metric? Or could we re-use the existing "dropped record metric"?
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/10/24 5:11 AM, Sebastien Viale wrote:
> > >> Hi,
> > >>
> > >> You are right, it will simplify types.
> > >>
> > >> We update the KIP
> > >>
> > >> regards
> > >>
> > >> Sébastien *VIALE***
> > >>
> > >> *MICHELIN GROUP* - InfORMATION Technology
> > >> *Technical Expert Kafka*
> > >>
> > >>   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> > >>
> > >>
> 
> > >> *De :* Bruno Cadonna 
> > >> *Envoyé :* mercredi 10 avril 2024 10:38
> > >> *À :* dev@kafka.apache.org 
> > >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> > >> handler for exceptions occuring during processing
> > >> Warning External sender Do not click on any links or open any
> > >> attachments unless you trust the sender and know the content is safe.
> > >>
> > >> Hi Loïc, Damien, and Sébastien,
> > >>
> > >> Great that we are converging!
> > >>
> > >>
> > >> 3.
> > >> Damien and Loïc, I think in your examples the handler will receive
> > >> Record because an Record is passed to
> > >> the processor in the following code line:
> > >>
> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> <
> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> >
> > >>
> > >> I see that we do not need to pass into the the handler a
> Record > >> byte[]> just because we do that for the
> DeserializationExceptionHandler
> > >> and the ProductionExceptionHandler. When those two handlers are
> called,
> > >> the record is already serialized. This is not the case for the
> > >> ProcessingExceptionHandler. However, I would propose to use Record ?>
> > >> for the record that is passed to the ProcessingExceptionHandler
> because
> > >> it makes the handler API more flexible.
> > >>
> > >>
> > >> Best,
> > >> Bruno
> > >&g

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-11 Thread Damien Gasparina
ly) and if we
> > don't do anything special for the DSL I am ok with moving forward with
> > this KIP as-is, but we should be aware of potential limitations for DSL
> > users. We can always do a follow up KIP to close gaps when we understand
> > the impact better -- covering the DSL would also expand the scope of
> > this KIP significantly...
> >
> > About the metric: just to double check. Do we think it's worth to add a
> > new metric? Or could we re-use the existing "dropped record metric"?
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/10/24 5:11 AM, Sebastien Viale wrote:
> >> Hi,
> >>
> >> You are right, it will simplify types.
> >>
> >> We update the KIP
> >>
> >> regards
> >>
> >> Sébastien *VIALE***
> >>
> >> *MICHELIN GROUP* - InfORMATION Technology
> >> *Technical Expert Kafka*
> >>
> >>   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> >>
> >> 
> >> *De :* Bruno Cadonna 
> >> *Envoyé :* mercredi 10 avril 2024 10:38
> >> *À :* dev@kafka.apache.org 
> >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >> handler for exceptions occuring during processing
> >> Warning External sender Do not click on any links or open any
> >> attachments unless you trust the sender and know the content is safe.
> >>
> >> Hi Loïc, Damien, and Sébastien,
> >>
> >> Great that we are converging!
> >>
> >>
> >> 3.
> >> Damien and Loïc, I think in your examples the handler will receive
> >> Record because an Record is passed to
> >> the processor in the following code line:
> >> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> >>  
> >> <https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152>
> >>
> >> I see that we do not need to pass into the the handler a Record >> byte[]> just because we do that for the DeserializationExceptionHandler
> >> and the ProductionExceptionHandler. When those two handlers are called,
> >> the record is already serialized. This is not the case for the
> >> ProcessingExceptionHandler. However, I would propose to use Record
> >> for the record that is passed to the ProcessingExceptionHandler because
> >> it makes the handler API more flexible.
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >> This email was screened for spam and malicious content but exercise
> >> caution anyway.
> >>
> >>
> >>
> >>
> >> On 4/9/24 9:09 PM, Loic Greffier wrote:
> >>  > Hi Bruno and Bill,
> >>  >
> >>  > To complete the Damien's purposes about the point 3.
> >>  >
> >>  > Processing errors are caught and handled by the
> >> ProcessingErrorHandler, at the precise moment when records are
> >> processed by processor nodes. The handling will be performed in the
> >> "process" method of the ProcessorNode, such as:
> >>  >
> >>  > public void process(final Record record) {
> >>  > ...
> >>  >
> >>  > try {
> >>  > ...
> >>  > } catch (final ClassCastException e) {
> >>  > ...
> >>  > } catch (Exception e) {
> >>  > ProcessingExceptionHandler.ProcessingHandlerResponse response =
> >> this.processingExceptionHandler
> >>  > .handle(internalProcessorContext, (Record) record, e);
> >>  >
> >>  > if (response ==
> >> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
> >>  > throw new StreamsException("Processing exception handler is set to
> >> fail upon" +
> >>  > " a processing error. If you would rather have the streaming
> >> pipeline" +
> >>  > " continue after a processing error, please set the " +
> >>  > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
> >>  > e);
> >>  > }
> >>  > }
> >>  > }
> >>  > As you can see, the record is transmitted to the
> >> ProcessingExceptionHandler as a Record, as we are
> >> dealing with the inp

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-11 Thread Bruno Cadonna

Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of 
processor node metrics. That ID cannot be internal since it is exposed 
in metrics. I think the processor name and the processor node ID is the 
same thing. I followed how the processor node ID is set in metrics and I 
ended up in addProcessor(name, ...).



1.b
Regarding ProcessingContext, I also thought about a separate class to 
pass-in context information into the handler, but then I dismissed the 
idea because I thought I was overthinking it. Apparently, I was not 
overthinking it if you also had the same idea. So let's consider a 
separate class.



4.
Regarding the metric, thanks for pointing to the dropped-record metric, 
Matthias. The dropped-record metric is used with the deserialization 
handler and the production handler. So, it would make sense to also use 
it for this handler. However, the dropped-record metric only records 
records that are skipped by the handler and not the number of calls to 
the handler. But that difference is probably irrelevant since in case of 
FAIL, the metric will be reset anyways since the stream thread will be 
restarted. In conclusion, I think the dropped-record metric in 
combination with a warn log message might be the better choice to 
introducing a new metric.



8.
Regarding the DSL, I think we should close possible gaps in a separate KIP.


Best,
Bruno

On 4/11/24 12:06 AM, Matthias J. Sax wrote:

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the 
processor node id? Isn't this internal (could not even find it quickly). 
We do have a processor name, right? Or do I mix up something?


Another question is about `ProcessingContext` -- it contains a lot of 
(potentially irrelevant?) metadata. We should think carefully about what 
we want to pass in and what not -- removing stuff is hard, but adding 
stuff is easy. It's always an option to create a new interface that only 
exposes stuff we find useful, and allows us to evolve this interface 
independent of others. Re-using an existing interface always has the 
danger to introduce an undesired coupling that could bite us in the 
future. -- It make total sense to pass in `RecordMetadata`, but 
`ProcessingContext` (even if already limited compared to 
`ProcessorContext`) still seems to be too broad? For example, there is 
`getStateStore()` and `schedule()` methods which I think we should not 
expose.


The other interesting question is about "what record gets passed in". 
For the PAPI, passing in the Processor's input record make a lot of 
sense. However, for DSL operators, I am not 100% sure? The DSL often 
uses internal types not exposed to the user, and thus I am not sure if 
users could write useful code for this case? -- In general, I still 
agree that the handler should be implement with a try-catch around 
`Processor.process()` but it might not be too useful for DSL processor. 
Hence, I am wondering if we need to so something more in the DSL? I 
don't have a concrete proposal (a few high level ideas only) and if we 
don't do anything special for the DSL I am ok with moving forward with 
this KIP as-is, but we should be aware of potential limitations for DSL 
users. We can always do a follow up KIP to close gaps when we understand 
the impact better -- covering the DSL would also expand the scope of 
this KIP significantly...


About the metric: just to double check. Do we think it's worth to add a 
new metric? Or could we re-use the existing "dropped record metric"?




-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

  Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand


*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
 
<https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152>

I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionEx

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the 
processor node id? Isn't this internal (could not even find it quickly). 
We do have a processor name, right? Or do I mix up something?


Another question is about `ProcessingContext` -- it contains a lot of 
(potentially irrelevant?) metadata. We should think carefully about what 
we want to pass in and what not -- removing stuff is hard, but adding 
stuff is easy. It's always an option to create a new interface that only 
exposes stuff we find useful, and allows us to evolve this interface 
independent of others. Re-using an existing interface always has the 
danger to introduce an undesired coupling that could bite us in the 
future. -- It make total sense to pass in `RecordMetadata`, but 
`ProcessingContext` (even if already limited compared to 
`ProcessorContext`) still seems to be too broad? For example, there is 
`getStateStore()` and `schedule()` methods which I think we should not 
expose.


The other interesting question is about "what record gets passed in". 
For the PAPI, passing in the Processor's input record make a lot of 
sense. However, for DSL operators, I am not 100% sure? The DSL often 
uses internal types not exposed to the user, and thus I am not sure if 
users could write useful code for this case? -- In general, I still 
agree that the handler should be implement with a try-catch around 
`Processor.process()` but it might not be too useful for DSL processor. 
Hence, I am wondering if we need to so something more in the DSL? I 
don't have a concrete proposal (a few high level ideas only) and if we 
don't do anything special for the DSL I am ok with moving forward with 
this KIP as-is, but we should be aware of potential limitations for DSL 
users. We can always do a follow up KIP to close gaps when we understand 
the impact better -- covering the DSL would also expand the scope of 
this KIP significantly...


About the metric: just to double check. Do we think it's worth to add a 
new metric? Or could we re-use the existing "dropped record metric"?




-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

  Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand


*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
 
<https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152>

I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionExceptionHandler. When those two handlers are called,
the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record
for the record that is passed to the ProcessingExceptionHandler because
it makes the handler API more flexible.


Best,
Bruno

This email was screened for spam and malicious content but exercise 
caution anyway.





On 4/9/24 9:09 PM, Loic Greffier wrote:
 > Hi Bruno and Bill,
 >
 > To complete the Damien's purposes about the point 3.
 >
 > Processing errors are caught and handled by the 
ProcessingErrorHandler, at the precise moment when records are processed 
by processor nodes. The handling will be performed in the "process" 
method of the ProcessorNode, such as:

 >
 > public void process(final Record record) {
 > ...
 >
 > try {
 > ...
 > } catch (final ClassCastException e) {
 > ...
 > } catch (Exception e) {
 > ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler

 > .handle(internalProcessorContext, (Record) record, e);
 >
 > if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
 > throw new StreamsException("Processing exception handler is set to 
fail upon" +

 > " a processing error. If you would rather have the streaming pipeline" +
 > " continue after a processing error, please s

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Bruno Cadonna

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive 
Record because an Record is passed to 
the processor in the following code line:

https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152

I see that we do not need to pass into the the handler a Recordbyte[]> just because we do that for the DeserializationExceptionHandler 
and the ProductionExceptionHandler. When those two handlers are called, 
the record is already serialized. This is not the case for the 
ProcessingExceptionHandler. However, I would propose to use Record 
for the record that is passed to the ProcessingExceptionHandler because 
it makes the handler API more flexible.



Best,
Bruno


On 4/9/24 9:09 PM, Loic Greffier wrote:

Hi Bruno and Bill,

To complete the Damien's purposes about the point 3.

Processing errors are caught and handled by the ProcessingErrorHandler, at the precise 
moment when records are processed by processor nodes. The handling will be performed in 
the "process" method of the ProcessorNode, such as:

public void process(final Record record) {
 ...

 try {
 ...
 } catch (final ClassCastException e) {
 ...
 } catch (Exception e) {
 ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler
 .handle(internalProcessorContext, (Record) 
record, e);

 if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
 throw new StreamsException("Processing exception handler is set to 
fail upon" +
 " a processing error. If you would rather have the 
streaming pipeline" +
 " continue after a processing error, please set the " +
 DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " 
appropriately.",
 e);
 }
 }
}
As you can see, the record is transmitted to the ProcessingExceptionHandler as a 
Record, as we are dealing with the input record of the processor at 
this point. It can be any type, including non-serializable types, as suggested by the 
Damien's example. As the ProcessingErrorHandler is not intended to perform any 
serialization, there should be no issue for the users to handle a 
Record.

I follow Damien on the other points.

For point 6, underlying public interfaces are renamed as well:
- The ProcessingHandlerResponse
- The 
ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
- The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG 
(default.processing.exception.handler)

Regards,

Loïc

De : Damien Gasparina 
Envoyé : mardi 9 avril 2024 20:08
À : dev@kafka.apache.org
Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
for exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Bruno, Bill,

First of all, thanks a lot for all your useful comments.


1. and 2.
I am wondering whether we should expose the processor node ID -- which
basically is the processor node name -- in the ProcessingContext
interface. I think the processor node ID fits well in the
ProcessingContext interface since it already contains application ID and
task ID and it would make the API for the handler cleaner.


That's a good point, the actual ProcessorContextImpl is already holding the
current node in an attribute (currentNode), thus exposing the node ID should
not be a problem. Let me sleep on it and get back to you regarding this
point.


3.
Could you elaborate -- maybe with an example -- when a record is in a
state in which it cannot be serialized? This is not completely clear to

me.

The Record passed to the handler is the input record to the processor. In
the Kafka Streams API, it could be any POJO.
e.g. with the following topology `
streamsBuilder.stream("x")
.map((k, v) -> new KeyValue("foo", Pair.of("hello",
"world")))
.forEach((k, v) -> throw new RuntimeException())
I would expect the handler to receive a Record>.


4.
Regarding the metrics, it is not entirely clear to me what the metric
measures. Is it the number of calls to the process handler or is it the
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to
put the task-level metrics to INFO reporting level and remove the
thread-level metric, similar to the dropped-records metric. You can
always roll-up the metrics to the thread level in your preferred
monitoring system. Or do you think we end up with to many metrics?


We were thinking of the for

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Loic Greffier
Hi Bruno and Bill,

To complete the Damien's purposes about the point 3.

Processing errors are caught and handled by the ProcessingErrorHandler, at the 
precise moment when records are processed by processor nodes. The handling will 
be performed in the "process" method of the ProcessorNode, such as:

public void process(final Record record) {
...

try {
...
} catch (final ClassCastException e) {
...
} catch (Exception e) {
ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler
.handle(internalProcessorContext, (Record) 
record, e);

if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
throw new StreamsException("Processing exception handler is set 
to fail upon" +
" a processing error. If you would rather have the 
streaming pipeline" +
" continue after a processing error, please set the " +
DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " 
appropriately.",
e);
}
}
}
As you can see, the record is transmitted to the ProcessingExceptionHandler as 
a Record, as we are dealing with the input record of the 
processor at this point. It can be any type, including non-serializable types, 
as suggested by the Damien's example. As the ProcessingErrorHandler is not 
intended to perform any serialization, there should be no issue for the users 
to handle a Record.

I follow Damien on the other points.

For point 6, underlying public interfaces are renamed as well:
- The ProcessingHandlerResponse
- The 
ProcessingLogAndContinueExceptionHandler/ProcessingLogAndFailExceptionHandler
- The configuration DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG 
(default.processing.exception.handler)

Regards,

Loïc

De : Damien Gasparina 
Envoyé : mardi 9 avril 2024 20:08
À : dev@kafka.apache.org
Objet : Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
for exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Bruno, Bill,

First of all, thanks a lot for all your useful comments.

> 1. and 2.
> I am wondering whether we should expose the processor node ID -- which
> basically is the processor node name -- in the ProcessingContext
> interface. I think the processor node ID fits well in the
> ProcessingContext interface since it already contains application ID and
> task ID and it would make the API for the handler cleaner.

That's a good point, the actual ProcessorContextImpl is already holding the
current node in an attribute (currentNode), thus exposing the node ID should
not be a problem. Let me sleep on it and get back to you regarding this
point.

> 3.
> Could you elaborate -- maybe with an example -- when a record is in a
> state in which it cannot be serialized? This is not completely clear to
me.

The Record passed to the handler is the input record to the processor. In
the Kafka Streams API, it could be any POJO.
e.g. with the following topology `
streamsBuilder.stream("x")
.map((k, v) -> new KeyValue("foo", Pair.of("hello",
"world")))
.forEach((k, v) -> throw new RuntimeException())
I would expect the handler to receive a Record>.

> 4.
> Regarding the metrics, it is not entirely clear to me what the metric
> measures. Is it the number of calls to the process handler or is it the
> number of calls to process handler that returned FAIL?
> If it is the former, I was also wondering whether it would be better to
> put the task-level metrics to INFO reporting level and remove the
> thread-level metric, similar to the dropped-records metric. You can
> always roll-up the metrics to the thread level in your preferred
> monitoring system. Or do you think we end up with to many metrics?

We were thinking of the former, measuring the number of calls to the
process handler. That's a good point, having the information at the task
level could be beneficial. I updated the KIP to change the metric level
and to clarify the wording.

> 5.
> What do you think about naming the handler ProcessingExceptionHandler
> instead of ProcessExceptionHandler?
> The DeserializationExceptionHanlder and the ProductionExceptionHandler
> also use the noun of the action in their name and not the verb.

Good catch, I updated the KIP to rename it ProcessingExceptionHandler.

> 6.
> What record is exactly passed to the handler?
> Is it the input record to the task? Is it the input record to the
> processor node? Is it the input record to the processor?

The input record of the processor. I assume that is the most user
friendly record in t

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Damien Gasparina
gt; > > basically is the processor node name -- in the ProcessingContext
> > > interface. I think the processor node ID fits well in the
> > > ProcessingContext interface since it already contains application ID
> and
> > > task ID and it would make the API for the handler cleaner.
> > >
> > >
> > > 3.
> > > Could you elaborate -- maybe with an example -- when a record is in a
> > > state in which it cannot be serialized? This is not completely clear to
> > me.
> > >
> > >
> > > 4.
> > > Regarding the metrics, it is not entirely clear to me what the metric
> > > measures. Is it the number of calls to the process handler or is it the
> > > number of calls to process handler that returned FAIL?
> > > If it is the former, I was also wondering whether it would be better to
> > > put the task-level metrics to INFO reporting level and remove the
> > > thread-level metric, similar to the dropped-records metric. You can
> > > always roll-up the metrics to the thread level in your preferred
> > > monitoring system. Or do you think we end up with to many metrics?
> > >
> > >
> > > 5.
> > > What do you think about naming the handler ProcessingExceptionHandler
> > > instead of ProcessExceptionHandler?
> > > The DeserializationExceptionHanlder and the ProductionExceptionHandler
> > > also use the noun of the action in their name and not the verb.
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > >
> > > On 4/8/24 3:48 PM, Sebastien Viale wrote:
> > >> Thanks for your review!
> > >>
> > >>   All the points make sense for us!
> > >>
> > >>
> > >>
> > >> We updated the KIP for points 1 and 4.
> > >>
> > >>
> > >>
> > >> 2/ We followed the DeserializationExceptionHandler interface
> > >> signature, it was not on our mind that the record be forwarded with
> > >> the ProcessorContext.
> > >>
> > >> The ProcessingContext is sufficient, we do expect that most people
> > >> would need to access the RecordMetadata.
> > >>
> > >>
> > >>
> > >> 3/ The use of Record is required, as the error could
> > >> occurred in the middle of a processor where records could be non
> > >> serializable objects
> > >>
> > >>   As it is a global error catching, the user may need little
> > >> information about the faulty record.
> > >>
> > >>   Assuming that users want to make some specific treatments to the
> > >> record, they can add a try / catch block in the topology.
> > >>
> > >>   It is up to users to cast record value and key in the implementation
> > >> of the ProcessorExceptionHandler.
> > >>
> > >>
> > >>
> > >> Cheers
> > >>
> > >> Loïc, Damien and Sébastien
> > >>
> > >> 
> > >> De : Sophie Blee-Goldman 
> > >> Envoyé : samedi 6 avril 2024 01:08
> > >> À : dev@kafka.apache.org 
> > >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> > >> handler for exceptions occuring during processing
> > >>
> > >> Warning External sender Do not click on any links or open any
> > >> attachments unless you trust the sender and know the content is safe.
> > >>
> > >> Hi Damien,
> > >>
> > >> First off thanks for the KIP, this is definitely a much needed
> > >> feature. On
> > >> the
> > >> whole it seems pretty straightforward and I am in favor of the
> proposal.
> > >> Just
> > >> a few questions and suggestions here and there:
> > >>
> > >> 1. One of the #handle method's parameters is "ProcessorNode node", but
> > >> ProcessorNode is an internal class (and would expose a lot of
> internals
> > >> that we probably don't want to pass in to an exception handler). Would
> > it
> > >> be sufficient to just make this a String and pass in the processor
> name?
> > >>
> > >> 2. Another of the parameters in the ProcessorContext. This would
> enable
> > >> the handler to potentially forward records, which imo should not be
> done
> > >> from the handler since it could only ever call #forward but not direct

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bill Bejeck
Hi Damien, Sebastien and Loic,

Thanks for the KIP, this is a much-needed addition.
I like the approach of getting the plumbing in for handling processor
errors, allowing users to implement more complex solutions as needed.

Overall how where the KIP Is now LGTM, modulo outstanding comments.  I
think adding the example you included in this thread to the KIP is a great
idea.

Regarding the metrics, I'm thinking along the same lines as Bruno.  I'm
wondering if we can make do with a task-level metric at the INFO level and
the processor metric at DEBUG.  IMHO, when it comes to tracking exceptions
in processing, these two areas are where users will want to focus, higher
level metrics wouldn't be as useful in this case.

Thanks,
Bill

On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna  wrote:

> Hi again,
>
> I have additional questions/comments.
>
> 6.
> What record is exactly passed to the handler?
> Is it the input record to the task? Is it the input record to the
> processor node? Is it the input record to the processor?
>
>
> 7.
> Could you please add the packages of the Java classes/interfaces/enums
> you want to add?
>
>
> Best,
> Bruno
>
>
> On 4/9/24 10:17 AM, Bruno Cadonna wrote:
> > Hi Loïc, Damien, and Sébastien,
> >
> > Thanks for the KIP!
> > I find it really great that you contribute back to Kafka Streams
> > concepts you developed for kstreamplify so that everybody can take
> > advantage from your improvements.
> >
> > I have a couple of questions/comments:
> >
> > 1. and 2.
> > I am wondering whether we should expose the processor node ID -- which
> > basically is the processor node name -- in the ProcessingContext
> > interface. I think the processor node ID fits well in the
> > ProcessingContext interface since it already contains application ID and
> > task ID and it would make the API for the handler cleaner.
> >
> >
> > 3.
> > Could you elaborate -- maybe with an example -- when a record is in a
> > state in which it cannot be serialized? This is not completely clear to
> me.
> >
> >
> > 4.
> > Regarding the metrics, it is not entirely clear to me what the metric
> > measures. Is it the number of calls to the process handler or is it the
> > number of calls to process handler that returned FAIL?
> > If it is the former, I was also wondering whether it would be better to
> > put the task-level metrics to INFO reporting level and remove the
> > thread-level metric, similar to the dropped-records metric. You can
> > always roll-up the metrics to the thread level in your preferred
> > monitoring system. Or do you think we end up with to many metrics?
> >
> >
> > 5.
> > What do you think about naming the handler ProcessingExceptionHandler
> > instead of ProcessExceptionHandler?
> > The DeserializationExceptionHanlder and the ProductionExceptionHandler
> > also use the noun of the action in their name and not the verb.
> >
> >
> > Best,
> > Bruno
> >
> >
> > On 4/8/24 3:48 PM, Sebastien Viale wrote:
> >> Thanks for your review!
> >>
> >>   All the points make sense for us!
> >>
> >>
> >>
> >> We updated the KIP for points 1 and 4.
> >>
> >>
> >>
> >> 2/ We followed the DeserializationExceptionHandler interface
> >> signature, it was not on our mind that the record be forwarded with
> >> the ProcessorContext.
> >>
> >> The ProcessingContext is sufficient, we do expect that most people
> >> would need to access the RecordMetadata.
> >>
> >>
> >>
> >> 3/ The use of Record is required, as the error could
> >> occurred in the middle of a processor where records could be non
> >> serializable objects
> >>
> >>   As it is a global error catching, the user may need little
> >> information about the faulty record.
> >>
> >>   Assuming that users want to make some specific treatments to the
> >> record, they can add a try / catch block in the topology.
> >>
> >>   It is up to users to cast record value and key in the implementation
> >> of the ProcessorExceptionHandler.
> >>
> >>
> >>
> >> Cheers
> >>
> >> Loïc, Damien and Sébastien
> >>
> >> 
> >> De : Sophie Blee-Goldman 
> >> Envoyé : samedi 6 avril 2024 01:08
> >> À : dev@kafka.apache.org 
> >> Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >> handler for exceptions occuring during 

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi again,

I have additional questions/comments.

6.
What record is exactly passed to the handler?
Is it the input record to the task? Is it the input record to the 
processor node? Is it the input record to the processor?



7.
Could you please add the packages of the Java classes/interfaces/enums 
you want to add?



Best,
Bruno


On 4/9/24 10:17 AM, Bruno Cadonna wrote:

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface 
signature, it was not on our mind that the record be forwarded with 
the ProcessorContext.


    The ProcessingContext is sufficient, we do expect that most people 
would need to access the RecordMetadata.




3/ The use of Record is required, as the error could 
occurred in the middle of a processor where records could be non 
serializable objects


  As it is a global error catching, the user may need little 
information about the faulty record.


  Assuming that users want to make some specific treatments to the 
record, they can add a try / catch block in the topology.


  It is up to users to cast record value and key in the implementation 
of the ProcessorExceptionHandler.




Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing


Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Damien,

First off thanks for the KIP, this is definitely a much needed 
feature. On

the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct 
where

the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context 
of the

processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside 
whether

that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API 
here,
so I'll hold off until you clarify whether you even want forwarding or 
not.
We would also need to split the input record into a Record vs 
FixedKeyRecord


3. One notable difference between th

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people would 
need to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

  As it is a global error catching, the user may need little information about 
the faulty record.

  Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

  It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Loic Greffier
Hi,

To complete the Sébastien's answer about the point 3, here is an example of how 
users could simply type the record key or value, based on a custom process 
exception handler:

Properties streamProps = new Properties();
streamProps.put(StreamsConfig.DEFAULT_PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG, 
CustomProcessExceptionHandler.class);

public class CustomProcessExceptionHandler implements ProcessExceptionHandler {

@Override
public ProcessHandlerResponse handle(ProcessingContext context, String 
nodeName, Record record, Exception exception) {
log.info("Error in node: {}, key: {}, value: {}, exception: {}", 
nodeName, record.key(), record.value(), exception);

if (record.value() instanceof Animal) {
Animal value = (Animal) record.value();
// Do something
}

return ProcessHandlerResponse.CONTINUE;
}

@Override
public void configure(Map configs) {

}
}

The example will be added to the KIP.

Regards

Sébastien, Damien and Loïc

De : Sebastien Viale 
Envoyé : lundi 8 avril 2024 15:49
À : dev@kafka.apache.org
Objet : RE: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler 
for exceptions occuring during processing

Thanks for your review!

All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people would need 
to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

As it is a global error catching, the user may need little information about 
the faulty record.

Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman mailto:sop...@responsive.dev>>
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
mailto:dev@kafka.apache.org>>
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processor name to
input record types?

If you could provide an example of this new feature in the KIP, it would be
very helpful in understanding whether we could do something to make it
easier for users to use, for if it would be fine as-is

4. We should include 

RE: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-08 Thread Sebastien Viale
Thanks for your review!

 All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

   The ProcessingContext is sufficient, we do expect that most people would 
need to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

 As it is a global error catching, the user may need little information about 
the faulty record.

 Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

 It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processor name to
input record types?

If you could provide an example of this new feature in the KIP, it would be
very helpful in understanding whether we could do something to make it
easier for users to use, for if it would be fine as-is

4. We should include all the relevant info for a new metric, such as the
metric
group and recording level. You can look at other metrics KIPs like KIP-444
and KIP-613 for an example. I suspect you intend for this to be in the
processor group and at the INFO level?

Hope that all makes sense! Thanks again for the KIP

-Sophie

On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina 
wrote:

> Hi everyone,
>
> After writing quite a few Kafka Streams applications, me and my colleagues
> just created KIP-1033 to introduce a new Exception Handler in Kafka Streams
> to simplify error handling.
> This feature would allow defining an exception handler to automatically
> catch exceptions occurring during the processing of a message.
>
> KIP link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing>
>
> Feedbacks and suggestions are welcome,
>
> Cheers,
> Damien, Sebastien and Loic
>

This email was screened for spam and malicious content but exercise caution 
anyway.




Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-05 Thread Sophie Blee-Goldman
Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from processor name to
input record types?

If you could provide an example of this new feature in the KIP, it would be
very helpful in understanding whether we could do something to make it
easier for users to use, for if it would be fine as-is

4. We should include all the relevant info for a new metric, such as the
metric
group and recording level. You can look at other metrics KIPs like KIP-444
and KIP-613 for an example. I suspect you intend for this to be in the
processor group and at the INFO level?

Hope that all makes sense! Thanks again for the KIP

-Sophie

On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina 
wrote:

> Hi everyone,
>
> After writing quite a few Kafka Streams applications, me and my colleagues
> just created KIP-1033 to introduce a new Exception Handler in Kafka Streams
> to simplify error handling.
> This feature would allow defining an exception handler to automatically
> catch exceptions occurring during the processing of a message.
>
> KIP link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
>
> Feedbacks and suggestions are welcome,
>
> Cheers,
> Damien, Sebastien and Loic
>