Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-06-13 Thread Damien Gasparina
ce
> > so that it can also produce to DLQ? AFAIK it's not covered by any of
> > the existing exception handlers, but it can cause similar failures
> > (potentially custom logic, depends on validity input record). There
> > could also be a default implementation as a subclass of
> > `ExtractRecordMetadataTimestamp`.
> >
> > L3. It would be nice to include an example of how to produce to
> > multiple topics in the KIP, as I can imagine that this will be a
> > common use-case. I wasn't sure how much code would be involved to make
> > it work. If a lot of code is required, we may want to consider
> > exposing some utils that make it easier.
> >
> > Cheers,
> > Lucas
> >
> > This email was screened for spam and malicious content but exercise caution 
> > anyway.
> >
> >
> >
> > On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina  
> > wrote:
> >>
> >> Hi everyone,
> >>
> >> Following all the discussion on this KIP and KIP-1033, we introduced a
> >> new container class containing only processing context metadata:
> >> ProcessingMetadata. This new container class is actually part of
> >> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> >> think it's the wisest implementation wise.
> >>
> >> I also clarified the interface of the enums:
> >> withDeadLetterQueueRecords(Iterable >> byte[]>> deadLetterQueueRecords) . Very likely most users would just
> >> send one DLQ record, but there might be specific use-cases and what
> >> can do more can do less, so I added an Iterable.
> >>
> >> I took some time to think about the impact of storing the
> >> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> >> the topic/offset/partition should be fine, but I am concerned about
> >> storing the rawSourceKey/Value. I think it could impact some specific
> >> use-cases, for example, a high-throughput Kafka Streams application
> >> "counting" messages could have huge source input messages, and very
> >> small sink messages, here, I assume storing the rawSourceKey/Value
> >> could significantly require more memory than the actual Kafka Producer
> >> buffer.
> >>
> >> I think the safest approach is actually to only store the fixed-size
> >> metadata for the ProductionExceptionHandler.handle:
> >> topic/partition/offset/processorNodeId/taskId, it might be confusing
> >> for the user, but 1) it is still better than nowaday where there are
> >> no context information at all, 2) it would be clearly stated in the
> >> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> >> punctuate case). .
> >>
> >> Do you think it would be a suitable design Sophie?
> >>
> >> Cheers,
> >> Damien
> >>
> >> On Sun, 14 Apr 2024 at 21:30, Loic Greffier  
> >> wrote:
> >>>
> >>> Hi Sophie,
> >>>
> >>> Thanks for your feedback.
> >>> Completing the Damien's comments here for points S1 and S5B.
> >>>
> >>> S1:
> >>>> I'm confused -- are you saying that we're introducing a new kind of 
> >>>> ProducerRecord class for this?
> >>>
> >>> I am wondering if it makes sense to alter the ProducerRecord from Clients 
> >>> API with a "deadLetterQueueTopicName" attribute dedicated to Kafka 
> >>> Streams DLQ.
> >>> Adding "deadLetterQueueTopicName" as an additional parameter to 
> >>> "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> >>> records to different DLQ topics depending on conditions:
> >>> @Override
> >>> public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> >>> context,
> >>> ProducerRecord record,
> >>> Exception exception) {
> >>> if (condition1) {
> >>> return ProductionExceptionHandlerResponse.CONTINUE
> >>> .withDeadLetterQueueRecord(record, "dlq-topic-a");
> >>> }
> >>> if (condition2) {
> >>> return ProductionExceptionHandlerResponse.CONTINUE
> >>> .withDeadLetterQueueRecord(record, "dlq-topic-b");
> >>> }
> >>> return ProductionExceptionHandlerResponse.CONTINUE
> >>> .withDeadLetterQueueRecord(record, "dlq-topic-c");
> >>> }
> >>>
> >>> S5B:
> >>>> I was having a bit of trou

Re: [VOTE] KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing

2024-05-15 Thread Damien Gasparina
Thanks Lianet, I’ll conclude the vote with 4 binding votes from
Matthias J. Sax, Sophie Blee-Goldman, Bill Bejeck and Bruno Cadona.


Thank you everyone!

On Tue, 14 May 2024 at 17:56, Lianet M.  wrote:
>
> +1 (non-binding)
>
> Thanks for the KIP and updates!
>
> Lianet
>
> On Tue, May 14, 2024, 12:03 a.m. Matthias J. Sax  wrote:
>
> > +1 (binding)
> >
> > On 5/13/24 5:54 PM, Sophie Blee-Goldman wrote:
> > > Thanks for the KIP guys!
> > >
> > > +1 (binding)
> > >
> > > On Mon, May 13, 2024 at 6:02 AM Bill Bejeck  wrote:
> > >
> > >> Thanks for the KIP, this will be a great addition!
> > >>
> > >> +1(binding)
> > >>
> > >> -Bill
> > >>
> > >> On Fri, May 3, 2024 at 4:48 AM Bruno Cadonna 
> > wrote:
> > >>
> > >>> Hi Damien, Sébastien, and Loïc,
> > >>>
> > >>> Thanks for the KIP!
> > >>>
> > >>> +1 (binding)
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>>
> > >>> On 4/26/24 4:00 PM, Damien Gasparina wrote:
> > >>>> Hi all,
> > >>>>
> > >>>> We would like to start a vote for KIP-1033: Add Kafka Streams
> > >>>> exception handler for exceptions occurring during processing
> > >>>>
> > >>>> The KIP is available on
> > >>>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing
> > >>>>
> > >>>> If you have any suggestions or feedback, feel free to participate to
> > >>>> the discussion thread:
> > >>>> https://lists.apache.org/thread/1nhhsrogmmv15o7mk9nj4kvkb5k2bx9s
> > >>>>
> > >>>> Best regards,
> > >>>> Damien Sebastien and Loic
> > >>>
> > >>
> > >
> >


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-13 Thread Damien Gasparina
Thanks for the feedback, I think we should keep two separate callbacks
for serialization and error handlers. It makes sense for type safety
(ProducerRecord vs POJO) and also for backward
compatibility. On top of that, all metadata provided in the #handle
method would need to be held in memory until the producer invokes its
callback, in the future, having two callbacks might avoid confusion as
some metadata might be provided to #handle and not
#handleSerializationException. I do think that one method would be
cleaner but for backward compatibility, type safety and memory
reasons, we should keep two separate callbacks.

As you suggested Sophie, I updated the KIP to introduce a
SerializationExceptionOrigin enum and added the "origin" parameter to
the #handleSerializationException method.

On Sat, 11 May 2024 at 07:30, Sophie Blee-Goldman  wrote:
>
> Whoops, just noticed there is already a voting thread for this. Hard to
> keep track with all the KIPs going on right now!
>
> In that case I'll just wait for the SerializationExceptionOrigin thing to
> be added and then I'll vote. Should definitely be able to make 3.8 in this
> case :D
>
> On Fri, May 10, 2024 at 10:10 PM Sophie Blee-Goldman 
> wrote:
>
> > Sounds like we're more or less in agreement here. I think the KIP just
> > needs one small update still, which is the SerializationExceptionOrigin
> > enum.
> >
> > As discussed there are a few options for this and we're all happy to defer
> > to the preference of the KIP authors, but if we keep the KIP as-is with the
> > two separate methods in the ProductionExceptionHandler, then imo it makes
> > the most sense to add the SerializationExceptionOrigin enum to the
> > ProductionExceptionHandler interface and then add an "origin" parameter to
> > the new  #handleSerializationException method. However you decide to do it,
> > I'm personally happy to vote on this KIP once the KIP is updated.
> >
> >  Just FYI the 3.8 KIP freeze is this upcoming Wednesday, so if you guys
> > would like to target 3.8 for this feature, just make sure to update the KIP
> > and kick off a [VOTE] thread by EOD Monday so that you can close the vote
> > by EOD Wednesday (since it has to be open for 72 hours).
> >
> > Thanks again for this sorely needed feature!
> >
> > On Fri, May 10, 2024 at 10:06 AM Bill Bejeck  wrote:
> >
> >> Great KIP discussion so far by everyone.
> >> At this point, I'm in agreement with the direction and current state of
> >> the
> >> KIP.
> >>
> >> As for having two separate callbacks for the ProductionExceptionHandler,
> >> I'm somewhat split in that I agree with points raised by Sophie and
> >> Matthias with my final
> >> position being to maintain both callbacks.  IMHO, while there are several
> >> things that could go wrong with producing a message, it seems that
> >> serialization exceptions would be the most common, although I don't have
> >> any data to back that opinion up.  But having said that, should the KIP
> >> authors decide otherwise, I would be fine with that approach as well.
> >>
> >> I'm at the point where I'm comfortable voting for this KIP.
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Thu, May 9, 2024 at 4:28 PM Sophie Blee-Goldman  >> >
> >> wrote:
> >>
> >> > The type safety issue is definitely not solved by having two separate
> >> > callbacks. I just think it gets a bit worse by mashing them into one
> >> > method. At least in the plain #handle method you can be sure that the
> >> type
> >> > is ProducerRecord and in #handleSerialization the type
> >> is
> >> > some POJO.
> >> >
> >> > And in theory you can just embed the mapping of sink topics to
> >> type/Serde
> >> > based on your topology. Or let's say your output record keys & values
> >> are
> >> > all Strings, and you want to print the String representation in your
> >> > handler, rather than the bytes.
> >> > Having a separate callback means knowing you can simply print the
> >> > ProducerRecord's key/value in the #handleSerialization method, and will
> >> > have to use a StringDeserializer to convert the key/value to its String
> >> > form to print it in the #handle method.
> >> >
> >> > Again, I just feel this will be more straightforward and easy for users
> >> to
> >> > use correctly, but am satisfied either way. I'll shut up now and wait
> >> for
> >> > the KIP authors to make a call on this one way or another, and then I'm
> >> > happy to cast my vote
> >> >
> >> > On Thu, May 9, 2024 at 10:15 AM Matthias J. Sax 
> >> wrote:
> >> >
> >> > > Thanks Sophie! Makes it much clearer where you are coming from.
> >> > >
> >> > > About the Type unsafety: isn't this also an issue for the
> >> > > `handleSerialziationException` case, because the handler is used for
> >> all
> >> > > sink topics, and thus key/value types are not really know w/o taking
> >> the
> >> > > sink topic into account? -- So I am not sure if having two handler
> >> > > methods really helps much with regard to type safety?
> >> > >
> >> > > Just want to make this 

[VOTE] KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing

2024-04-26 Thread Damien Gasparina
Hi all,

We would like to start a vote for KIP-1033: Add Kafka Streams
exception handler for exceptions occurring during processing

The KIP is available on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing

If you have any suggestions or feedback, feel free to participate to
the discussion thread:
https://lists.apache.org/thread/1nhhsrogmmv15o7mk9nj4kvkb5k2bx9s

Best regards,
Damien Sebastien and Loic


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-23 Thread Damien Gasparina
AFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions).
> I think it's sufficient to only add a single new method which
> "blends/unifies" both exiting ones:
>
>handle(final ErrorHandlerContext context,
>   final ProducerRecord record, // no generic types
>   final Exception exception)
>
>
> 107 `DeserializationExceptionHandler`: same question as above, about the
> default impl and letting it throw an exception.
>
>
> 108 `default.process.exception.handler`: we use the prefix `default.`
> for both existing handlers, because we allow to pass in topic specific
> handlers via `Consumed` and `Produced` overwrites, ie, the default can
> be overwritten. We don't want to allow to pass in a Processor specific
> handler as pointed out in "Rejected Alternatives" section, and thus the
> configured handler is not really a "default" as it cannot be
> overwritten. For this case, we should drop the `default.` prefix in the
> config name.
>
>
> 109: Lucas brought up the idea on the KIP-1034 discussion to also
> include `TimestampExtractor` interface for DLQ, what I think makes a lot
> of sense. Not sure if we would need any extentions in this KIP to get
> this done? I would rather include timestamp extraction issue in the DLQ
> KIP from day one on. The interface is quite different though, so we
> would need to think a little bit about it in more details how to do
> this. Right now, the contract is that returning `-1` as extracted
> timestamp is an implicit "drop record" signal to the runtime, what is
> rather subtle. Can we do anything about this in a meaningful way?
>
>
>
> -Matthias
>
> On 4/22/24 8:20 AM, Damien Gasparina wrote:
> > Hi Bruno,
> >
> > 1) Good point, naming stuff is definitely hard, I renamed
> > ProcessingMetadata to ErrorHandlerContext
> >
> >> Is there any reason you did not use something like
> >> Record sourceRecord()
> >
> > 2) The record class is used in many locations and, I assume, could be
> > expanded by new features.
> > As the error handler metadata could be stored in memory for a longer
> > duration due to the ProductionExceptionHandle, I think it is wiser to
> > have an independent class that is the leanest possible.
> > Maybe I am overthinking this, what do you think?
> >
> > 3) Oops, good point, I didn't notice that it was still there. I
> > removed the parameter.
> >
> > 4) To clarify the KIP, I specified the DeserializationExceptionHandler
> > interface in the KIP.
> >
> > 5) Good point, I updated the KIP with the order you mentioned:
> > context, record, exception
> >
> > 6) I was indeed thinking of extending the ProcessingContext to store
> > it there. Let me update the KIP to make it clear.
> >
> >  From past experience, deprecating an interface is always a bit
> > painful, in this KIP, I relied on default implementation to ensure
> > backward compatibility while encouraging people to implement the new
> > method signature. If you know a better approach, I'll take :-)
> >
> > Cheers,
> > Damien
> >
> > On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna  wrote:
> >>
> >> Hi Damien,
> >>
> >> Thanks a lot for the updates!
> >>
> >> I have the following comments:
> >>
> >> (1)
> >> Could you rename ProcessingMetadata to ErrorHandlerContext or
> >> ErrorHandlerMetadata (I am preferring the former)? I think it makes it
> >> clearer for what this context/metadata is for.
> >>
> >>
> >> (2)
> >> Is there any reason you did not use something like
> >>
> >> Record sourceRecord()
> >>
> >> in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and
> >> headers()? The headers() method refers to the record read from the input
> >> topic of the sub-topology, right? If yes, maybe that is also something
> >> to mention more explicitly.
> >>
> >>
> >> (3)
> >> Since you added the processor node ID to the ProcessingMetadata, you can
> >> remove it from the signature of method handle() in
> >> ProcessingExceptionHandler.
> >>
> >>
> >> (4)
> >> Where are the mentioned changes to the DeserializationExceptionHandler?
> >>
> >>
> >> (5)
> >> To be consistent, the order of the parameters in the
> >> ProductionExceptionHandler should be:
> >> 1. context
> >> 2. record
> >> 3. exception
> >>
> >>
> >&g

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Damien Gasparina
Hi Bruno,

1) Good point, naming stuff is definitely hard, I renamed
ProcessingMetadata to ErrorHandlerContext

> Is there any reason you did not use something like
> Record sourceRecord()

2) The record class is used in many locations and, I assume, could be
expanded by new features.
As the error handler metadata could be stored in memory for a longer
duration due to the ProductionExceptionHandle, I think it is wiser to
have an independent class that is the leanest possible.
Maybe I am overthinking this, what do you think?

3) Oops, good point, I didn't notice that it was still there. I
removed the parameter.

4) To clarify the KIP, I specified the DeserializationExceptionHandler
interface in the KIP.

5) Good point, I updated the KIP with the order you mentioned:
context, record, exception

6) I was indeed thinking of extending the ProcessingContext to store
it there. Let me update the KIP to make it clear.

>From past experience, deprecating an interface is always a bit
painful, in this KIP, I relied on default implementation to ensure
backward compatibility while encouraging people to implement the new
method signature. If you know a better approach, I'll take :-)

Cheers,
Damien

On Mon, 22 Apr 2024 at 11:01, Bruno Cadonna  wrote:
>
> Hi Damien,
>
> Thanks a lot for the updates!
>
> I have the following comments:
>
> (1)
> Could you rename ProcessingMetadata to ErrorHandlerContext or
> ErrorHandlerMetadata (I am preferring the former)? I think it makes it
> clearer for what this context/metadata is for.
>
>
> (2)
> Is there any reason you did not use something like
>
> Record sourceRecord()
>
> in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and
> headers()? The headers() method refers to the record read from the input
> topic of the sub-topology, right? If yes, maybe that is also something
> to mention more explicitly.
>
>
> (3)
> Since you added the processor node ID to the ProcessingMetadata, you can
> remove it from the signature of method handle() in
> ProcessingExceptionHandler.
>
>
> (4)
> Where are the mentioned changes to the DeserializationExceptionHandler?
>
>
> (5)
> To be consistent, the order of the parameters in the
> ProductionExceptionHandler should be:
> 1. context
> 2. record
> 3. exception
>
>
> (6)
> I am wondering where the implementation of ProcessingMetadata gets the
> sourceRawKey/Value from. Do we need additional changes in
> ProcessingContext and implementations?
>
>
> Best,
> Bruno
>
>
> On 4/21/24 2:23 PM, Damien Gasparina wrote:
> > Hi Everyone,
> >
> > Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
> >- We introduced a new ProcessingMetadata class containing only the
> > ProcessorContext metadata: topic, partition, offset, headers[],
> > sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
> >- To be consistent, we propose to deprecate the existing
> > DeserializationExceptionHandler and ProductionExceptionHandler methods
> > to rely on the new ProcessingMetadata
> >- The creation and the ProcessingMetadata and the deprecation of old
> > methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
> > Dead Letter Queue implementation without touching any interfaces. We
> > introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
> > it's the wisest implementation wise.
> > - Instead of creating a new metric, KIP-1033 updates the
> > dropped-record metric.
> >
> > Let me know what you think, if everything's fine, I think we should be
> > good to start a VOTE?
> >
> > Cheers,
> > Damien
> >
> >
> >
> >
> >
> > On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  
> > wrote:
> >>
> >> Fully agree about creating a new class for the bits of ProcessingContext
> >> that are specific to metadata only. In fact, more or less this same point
> >> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> >> can't always be trusted to remain immutable. Maybe it's possible to solve
> >> both issues at once, with the same class?
> >>
> >> On another related note -- I had actually also just proposed that we
> >> deprecate the existing DeserializationExceptionHandler method and replace
> >> it with one using the new PAPI as part of KIP-1034. But now that I'm
> >> reading this, I would say it probably makes more sense to do in this KIP.
> >> We can also push that out into a smaller-scoped third KIP if you want, but
> >> clearly, there is some overlap here and so however you guys (the authors)
> >> want to organize 

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-21 Thread Damien Gasparina
Hi everyone,

Following all the discussion on this KIP and KIP-1033, we introduced a
new container class containing only processing context metadata:
ProcessingMetadata. This new container class is actually part of
KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
think it's the wisest implementation wise.

I also clarified the interface of the enums:
withDeadLetterQueueRecords(Iterable> deadLetterQueueRecords) . Very likely most users would just
send one DLQ record, but there might be specific use-cases and what
can do more can do less, so I added an Iterable.

I took some time to think about the impact of storing the
ProcessingMetadata on the ProductionExceptionHandler. I think storing
the topic/offset/partition should be fine, but I am concerned about
storing the rawSourceKey/Value. I think it could impact some specific
use-cases, for example, a high-throughput Kafka Streams application
"counting" messages could have huge source input messages, and very
small sink messages, here, I assume storing the rawSourceKey/Value
could significantly require more memory than the actual Kafka Producer
buffer.

I think the safest approach is actually to only store the fixed-size
metadata for the ProductionExceptionHandler.handle:
topic/partition/offset/processorNodeId/taskId, it might be confusing
for the user, but 1) it is still better than nowaday where there are
no context information at all, 2) it would be clearly stated in the
javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
punctuate case). .

Do you think it would be a suitable design Sophie?

Cheers,
Damien

On Sun, 14 Apr 2024 at 21:30, Loic Greffier  wrote:
>
> Hi Sophie,
>
> Thanks for your feedback.
> Completing the Damien's comments here for points S1 and S5B.
>
> S1:
> > I'm confused -- are you saying that we're introducing a new kind of 
> > ProducerRecord class for this?
>
> I am wondering if it makes sense to alter the ProducerRecord from Clients API 
> with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams DLQ.
> Adding "deadLetterQueueTopicName" as an additional parameter to 
> "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> records to different DLQ topics depending on conditions:
> @Override
> public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> context,
>  ProducerRecord byte[]> record,
>  Exception exception) {
> if (condition1) {
> return ProductionExceptionHandlerResponse.CONTINUE
>.withDeadLetterQueueRecord(record, "dlq-topic-a");
> }
> if (condition2) {
> return ProductionExceptionHandlerResponse.CONTINUE
> .withDeadLetterQueueRecord(record, "dlq-topic-b");
> }
> return ProductionExceptionHandlerResponse.CONTINUE
> .withDeadLetterQueueRecord(record, "dlq-topic-c");
> }
>
> S5B:
> > I was having a bit of trouble understanding what the behavior would be if 
> > someone configured a "errors.deadletterqueue.topic.name" but didn't 
> > implement the handlers.
>
> The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and 
> DefaultProductionExceptionHandler should be able to tell if records should be 
> sent to DLQ or not.
> The "errors.deadletterqueue.topic.name" takes place to:
>
>   *   Specifying if the provided handlers should or should not send records 
> to DLQ.
>  *   If the value is empty, the handlers should not send records to DLQ.
>  *   If the value is not empty, the handlers should send records to DLQ.
>   *   Define the name of the DLQ topic that should be used by the provided 
> handlers.
>
> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided 
> handlers should return either:
>
>   *   CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
>   *   FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
> If "errors.deadletterqueue.topic.name" is defined but neither 
> DeserializationExceptionHandler nor ProductionExceptionHandler classes are 
> defined in the configuration, then nothing should happen as sending to DLQ is 
> based on handlers’ response.
> When providing custom handlers, users would have the possibility to return:
>
>   *   FAIL
>   *   CONTINUE
>   *   FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
>   *   CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
>
> A DLQ topic name is currently required using the two last response types.
> I am wondering if it could benefit users to ease the use of the default DLQ 
> topic

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-21 Thread Damien Gasparina
Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
  - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
  - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
  - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
   - Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  wrote:
>
> Fully agree about creating a new class for the bits of ProcessingContext
> that are specific to metadata only. In fact, more or less this same point
> just came up in the related KIP 1034 for DLQs, since the RecordMetadata
> can't always be trusted to remain immutable. Maybe it's possible to solve
> both issues at once, with the same class?
>
> On another related note -- I had actually also just proposed that we
> deprecate the existing DeserializationExceptionHandler method and replace
> it with one using the new PAPI as part of KIP-1034. But now that I'm
> reading this, I would say it probably makes more sense to do in this KIP.
> We can also push that out into a smaller-scoped third KIP if you want, but
> clearly, there is some overlap here and so however you guys (the authors)
> want to organize this part of the work is fine with me. I do think it
> should be done alongside/before this KIP and 1034 though, for all the
> reasons already stated.
>
> Everything else in the discussion so far I agree with! The
> ProcessingContext thing is the only open question in my mind
>
> On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
> wrote:
>
> > Hi Matthias, Bruno,
> >
> > 1.a During my previous comment, by Processor Node ID, I meant
> > Processor name. This is important information to expose in the handler
> > as it allows users to identify the location of the exception in the
> > topology.
> > I assume this information could be useful in other places, that's why
> > I would lean toward adding this as an attribute in the
> > ProcessingContext.
> >
> > 1.b Looking at the ProcessingContext, I do think the following 3
> > methods should not be accessible in the exception handlers:
> > getStateStore(), schedule() and commit().
> > Having a separate interface would make a cleaner signature. It would
> > also be a great time to ensure that all exception handlers are
> > consistent, at the moment, the
> > DeserializationExceptionHandler.handle() relies on the old PAPI
> > ProcessorContext and the ProductionExceptionHandler.handle() has none.
> > It could make sense to build the new interface in this KIP and track
> > the effort to migrate the existing handlers in a separate KIP, what do
> > you think?
> > Maybe I am overthinking this part and the ProcessingContext would be fine.
> >
> > 4. Good point regarding the dropped-record metric, as it is used by
> > the other handlers, I do think it makes sense to leverage it instead
> > of creating a new metric.
> > I will update the KIP to update the dropped-record-metric.
> >
> > 8. Regarding the DSL, I am aligned with Bruno, I think we could close
> > the gaps in a future KIP.
> >
> > Cheers,
> > Damien
> >
> >
> > On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:
> > >
> > > Hi Matthias,
> > >
> > >
> > > 1.a
> > > With processor node ID, I mean the ID that is exposed in the tags of
> > > processor node metrics. That ID cannot be internal since it is exposed
> > > in metrics. I think the processor name and the processor node ID is the
> > > same thing. I followed how the processor node ID is set in metrics and I
> > > ended up in addProcessor(name, ...).
> > >
> > >
> > > 1.b
> > > Regarding ProcessingContext, I also thought about a separate class to
> > > pass-in context information into the handler, but then I dismissed the
> > > idea because I thought I was overthinking it. Apparently, I was not
> > > overthinking it if you also had the same idea. So let's consider a
> > > separate class.
> > >
> > >
&g

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-14 Thread Damien Gasparina
o the DLQ. I think this comes down to two things, though again,
> these aren't necessarily problems with the API and probably just need to be
> hashed out:
>
> S5a.
> When I envision a DLQ, to me, the most common use case would be to forward
> input records that failed somewhere along the processing graph. But it
> seems like all the focus here is on the two far ends of the subtopology --
> the input/consumer, and the output/producer. I get that
> the ProcessingExceptionHandler is really the missing piece here, and it's
> hard to say anything specific since it's not yet accepted, but maybe a
> somewhat more concrete example would help. FWIW I think/hope to get that
> KIP accepted and implementation ASAP, so I'm not worried about the "what if
> it doesn't happen" case -- more just want to know what it will look like
> when it does. Imo it's fine to build KIPs on top of future ones, it feels
> clear that this part will just have to wait for that KIP to actually be
> added.
>
> S5b:
> Why do users have to define the entire ProducerRecord -- shouldn't Streams
> handle all this for them? Or will we just automatically send every record
> on failure to the default global DLQ, and users only have to implement the
> handlers if they want to change the headers or send to a different topic? I
> was having a bit of trouble understanding what the behavior would be if
> someone configured a "errors.deadletterqueue.topic.name" but didn't
> implement the handlers. Apologies if it's somewhere in the KIP and I
> happened to miss it!
>
> Either way, I really think an example would help me to better imagine what
> this will look like in practice, and evaluate whether it actually involves
> as much overhead as I'm worried it will. Can you add a section that
> includes a basic  implementation of all the features here? Nothing too
> complicated, just the most bare-bones code needed to actually implement
> forwarding to a dead-letter-queue via the handlers.
>
> Lastly, two super small things:
>
> S6:
> We use camel case in Streams, so it should be rawSourceKey/Value rather
> than raw_source_key/value
>
> S7:
> Can you add javadocs for the #withDeadLetterQueueRecord? For example, it
> seems to me that if the topic to be sent to here is different than the
> default/global DLQ, then the user will need to make sure to have created
> this themselves up front.
>
> That's it from me...sorry for the long response, it's just because I'm
> excited for this feature and have been waiting on a KIP for this for years.
>
> Cheers,
> Sophie
>
>
> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina 
> wrote:
>
> > Hi Andrew,
> >
> > Thanks a lot for your review, plenty of good points!
> >
> > 11. Typo fixed, good cach.
> >
> > 12. I do agree with you and Nick also mentioned it, I updated the KIP
> > to mention that context headers should be forwarded.
> >
> > 13. Good catch, to be consistent with KIP-298, and without a strong
> > opinion from my side, I updated the KIP with your prefix proposal.
> >
> > 14. I am not sure about this point, a big difference between KIP-298
> > and this one is that the handlers can easily be overridden, something
> > that is not doable in Kafka Connect.
> > If someone would like a different behavior, e.g. to mask the payload
> > or include further headers, I think we should encourage them to write
> > their own exception handlers to build the DLQ Record the way they
> > expect.
> >
> > 15. Yeah, that's a good point, I was not fully convinced about putting
> > a String in it, I do assume that "null" is also a valid value. I do
> > assume that the Stacktrace and the Exception in this case are the key
> > metadata for the user to troubleshoot the problem.
> > I updated the KIP to mention that the value should be null if
> > triggered in a punctuate.
> >
> > 16. I added a session to mention that Kafka Streams would not try to
> > automatically create the topic and the topic should either be
> > automatically created, or pre-created.
> >
> > 17. If a DLQ record can not be sent, the exception should go to the
> > uncaughtExceptionHandler. Let me clearly state it in the KIP.
> >
> > On Fri, 12 Apr 2024 at 17:25, Damien Gasparina 
> > wrote:
> > >
> > > Hi Nick,
> > >
> > > 1. Good point, that's less impactful than a custom interface, I just
> > > updated the KIP with the new signature.
> > >
> > > 1.a I really meant ProducerRecord, that's the class used to forward to
> > > downstream processors in the PAPI. The only information missing in
> > > this clas

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Andrew,

Thanks a lot for your review, plenty of good points!

11. Typo fixed, good cach.

12. I do agree with you and Nick also mentioned it, I updated the KIP
to mention that context headers should be forwarded.

13. Good catch, to be consistent with KIP-298, and without a strong
opinion from my side, I updated the KIP with your prefix proposal.

14. I am not sure about this point, a big difference between KIP-298
and this one is that the handlers can easily be overridden, something
that is not doable in Kafka Connect.
If someone would like a different behavior, e.g. to mask the payload
or include further headers, I think we should encourage them to write
their own exception handlers to build the DLQ Record the way they
expect.

15. Yeah, that's a good point, I was not fully convinced about putting
a String in it, I do assume that "null" is also a valid value. I do
assume that the Stacktrace and the Exception in this case are the key
metadata for the user to troubleshoot the problem.
I updated the KIP to mention that the value should be null if
triggered in a punctuate.

16. I added a session to mention that Kafka Streams would not try to
automatically create the topic and the topic should either be
automatically created, or pre-created.

17. If a DLQ record can not be sent, the exception should go to the
uncaughtExceptionHandler. Let me clearly state it in the KIP.

On Fri, 12 Apr 2024 at 17:25, Damien Gasparina  wrote:
>
> Hi Nick,
>
> 1. Good point, that's less impactful than a custom interface, I just
> updated the KIP with the new signature.
>
> 1.a I really meant ProducerRecord, that's the class used to forward to
> downstream processors in the PAPI. The only information missing in
> this class is the topic name. I also considered relying on the Kafka
> Producer ProducerRecord, but I assume it would not be consistent with
> the KafkaStreams API.
>
> 2. Agreed
>
> 2.a I do think exceptions occurring during punctuate should be
> included in the DLQ.
> Even if building a suitable payload is almost impossible, even with
> custom code; those exceptions are still fatal for Kafka Streams by
> default and are something that can not be ignored safely.
> I do assume that most users would want to be informed if an error
> happened during a punctuate, even if only the metadata (e.g.
> stacktrace, exception) is provided.
> I am only concerned flooding the DLQ topic as, if a scheduled
> operation failed, very likely it will fails during the next
> invocation, but
>
> 4. Good point, I clarified the wording in the KIP to make it explicit.
>
> 5. Good point, I will clearly mention that it is out of scope as part
> of the KIP and might not be as trivial as people could expect. I will
> update the KIP once I do have some spare time.
>
> 6. Oh yeah, I didn't think about it, but forwarding input headers
> would definitely make sense. Confluent Schema Registry ID is actually
> part of the payload, but many correlation ID and technical metadata
> are passed through headers, it makes sense to forward them, specially
> as it is the default behavior of Kafka Streams,
>
>
>
> On Fri, 12 Apr 2024 at 15:25, Nick Telford  wrote:
> >
> > Hi Damien and Sebastien,
> >
> > 1.
> > I think you can just add a `String topic` argument to the existing
> > `withDeadLetterQueueRecord(ProducerRecord
> > deadLetterQueueRecord)` method, and then the implementation of the
> > exception handler could choose the topic to send records to using whatever
> > logic the user desires. You could perhaps provide a built-in implementation
> > that leverages your new config to send all records to an untyped DLQ topic?
> >
> > 1a.
> > BTW you have a typo: in your DeserializationExceptionHandler, the type of
> > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
> > probably be `ConsumerRecord`.
> >
> > 2.
> > Agreed. I think it's a good idea to provide an implementation that sends to
> > a single DLQ by default, but it's important to enable users to customize
> > this with their own exception handlers.
> >
> > 2a.
> > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
> > DLQ topic like it's a bad record. To me, a DLQ should only contain records
> > that failed to process. I'm not even sure how a user would
> > re-process/action one of these other errors; it seems like the purview of
> > error logging to me?
> >
> > 4.
> > My point here was that I think it would be useful for the KIP to contain an
> > explanation of the behavior both with KIP-1033 and without it. i.e. clarify
> > if/how records that throw an exception in a processor are handled. At the
> > momen

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Nick,

1. Good point, that's less impactful than a custom interface, I just
updated the KIP with the new signature.

1.a I really meant ProducerRecord, that's the class used to forward to
downstream processors in the PAPI. The only information missing in
this class is the topic name. I also considered relying on the Kafka
Producer ProducerRecord, but I assume it would not be consistent with
the KafkaStreams API.

2. Agreed

2.a I do think exceptions occurring during punctuate should be
included in the DLQ.
Even if building a suitable payload is almost impossible, even with
custom code; those exceptions are still fatal for Kafka Streams by
default and are something that can not be ignored safely.
I do assume that most users would want to be informed if an error
happened during a punctuate, even if only the metadata (e.g.
stacktrace, exception) is provided.
I am only concerned flooding the DLQ topic as, if a scheduled
operation failed, very likely it will fails during the next
invocation, but

4. Good point, I clarified the wording in the KIP to make it explicit.

5. Good point, I will clearly mention that it is out of scope as part
of the KIP and might not be as trivial as people could expect. I will
update the KIP once I do have some spare time.

6. Oh yeah, I didn't think about it, but forwarding input headers
would definitely make sense. Confluent Schema Registry ID is actually
part of the payload, but many correlation ID and technical metadata
are passed through headers, it makes sense to forward them, specially
as it is the default behavior of Kafka Streams,



On Fri, 12 Apr 2024 at 15:25, Nick Telford  wrote:
>
> Hi Damien and Sebastien,
>
> 1.
> I think you can just add a `String topic` argument to the existing
> `withDeadLetterQueueRecord(ProducerRecord
> deadLetterQueueRecord)` method, and then the implementation of the
> exception handler could choose the topic to send records to using whatever
> logic the user desires. You could perhaps provide a built-in implementation
> that leverages your new config to send all records to an untyped DLQ topic?
>
> 1a.
> BTW you have a typo: in your DeserializationExceptionHandler, the type of
> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
> probably be `ConsumerRecord`.
>
> 2.
> Agreed. I think it's a good idea to provide an implementation that sends to
> a single DLQ by default, but it's important to enable users to customize
> this with their own exception handlers.
>
> 2a.
> I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
> DLQ topic like it's a bad record. To me, a DLQ should only contain records
> that failed to process. I'm not even sure how a user would
> re-process/action one of these other errors; it seems like the purview of
> error logging to me?
>
> 4.
> My point here was that I think it would be useful for the KIP to contain an
> explanation of the behavior both with KIP-1033 and without it. i.e. clarify
> if/how records that throw an exception in a processor are handled. At the
> moment, I'm assuming that without KIP-1033, processing exceptions would not
> cause records to be sent to the DLQ, but with KIP-1033, they would. If this
> assumption is correct, I think it should be made explicit in the KIP.
>
> 5.
> Understood. You may want to make this explicit in the documentation for
> users, so they understand the consequences of re-processing data sent to
> their DLQ. The main reason I raised this point is it's something that's
> tripped me up in numerous KIPs that that committers frequently remind me
> of; so I wanted to get ahead of it for once! :D
>
> And one new point:
> 6.
> The DLQ record schema appears to discard all custom headers set on the
> source record. Is there a way these can be included? In particular, I'm
> concerned with "schema pointer" headers (like those set by Schema
> Registry), that may need to be propagated, especially if the records are
> fed back into the source topics for re-processing by the user.
>
> Regards,
> Nick
>
>
> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> wrote:
>
> > Hi Nick,
> >
> > Thanks a lot for your review and your useful comments!
> >
> > 1. It is a good point, as you mentioned, I think it would make sense
> > in some use cases to have potentially multiple DLQ topics, so we
> > should provide an API to let users do it.
> > Thinking out-loud here, maybe it is a better approach to create a new
> > Record class containing the topic name, e.g. DeadLetterQueueRecord and
> > changing the signature to
> > withDeadLetterQueueRecords(Iteratable
> > deadLetterQueueRecords) instead of
> > withDeadLetterQueueRecord(ProducerRecord
> > deadLetterQueueRecord). What do you

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Nick,

Thanks a lot for your review and your useful comments!

1. It is a good point, as you mentioned, I think it would make sense
in some use cases to have potentially multiple DLQ topics, so we
should provide an API to let users do it.
Thinking out-loud here, maybe it is a better approach to create a new
Record class containing the topic name, e.g. DeadLetterQueueRecord and
changing the signature to
withDeadLetterQueueRecords(Iteratable
deadLetterQueueRecords) instead of
withDeadLetterQueueRecord(ProducerRecord
deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
be something like "class DeadLetterQueueRecord extends
org.apache.kafka.streams.processor.api;.ProducerRecords { String
topic; /*  + getter/setter + */ } "

2. I think the root question here is: should we have one DLQ topic or
multiple DLQ topics by default. This question highly depends on the
context, but implementing a default implementation to handle multiple
DLQ topics would be opinionated, e.g. how to manage errors in a
punctuate?
I think it makes sense to have the default implementation writing all
faulty records to a single DLQ, that's at least the approach I used in
past applications: one DLQ per Kafka Streams application. Of course
the message format could change in the DLQ e.g. due to the source
topic, but those DLQ records will be very likely troubleshooted, and
maybe replay, manually anyway.
If a user needs to have multiple DLQ topics or want to enforce a
specific schema, it's still possible, but they would need to implement
custom Exception Handlers.
Coming back to 1. I do agree that it would make sense to have the user
set the DLQ topic name in the handlers for more flexibility.

3. Good point, sorry it was a typo, the ProcessingContext makes much
more sense here indeed.

4. I do assume that we could implement KIP-1033 (Processing exception
handler) independently from KIP-1034. I do hope that KIP-1033 would be
adopted and implemented before KIP-1034, but if that's not the case,
we could implement KIP-1034 indepantly and update KIP-1033 to include
the DLQ record afterward (in the same KIP or in a new one if not
possible).

5. I think we should be clear that this KIP only covers the DLQ record produced.
Everything related to replay messages or recovery plan should be
considered out-of-scope as it is use-case and error specific.

Let me know if that's not clear, there are definitely points that
highly debatable.

Cheers,
Damien

On Fri, 12 Apr 2024 at 13:00, Nick Telford  wrote:
>
> Oh, and one more thing:
>
> 5.
> Whenever you take a record out of the stream, and then potentially
> re-introduce it at a later date, you introduce the potential for record
> ordering issues. For example, that record could have been destined for a
> Window that has been closed by the time it's re-processed. I'd like to see
> a section that considers these consequences, and perhaps make those risks
> clear to users. For the record, this is exactly what sunk KIP-990, which
> was an alternative approach to error handling that introduced the same
> issues.
>
> Cheers,
>
> Nick
>
> On Fri, 12 Apr 2024 at 11:54, Nick Telford  wrote:
>
> > Hi Damien,
> >
> > Thanks for the KIP! Dead-letter queues are something that I think a lot of
> > users would like.
> >
> > I think there are a few points with this KIP that concern me:
> >
> > 1.
> > It looks like you can only define a single, global DLQ for the entire
> > Kafka Streams application? What about applications that would like to
> > define different DLQs for different data flows? This is especially
> > important when dealing with multiple source topics that have different
> > record schemas.
> >
> > 2.
> > Your DLQ payload value can either be the record value that failed, or an
> > error string (such as "error during punctuate"). This is likely to cause
> > problems when users try to process the records from the DLQ, as they can't
> > guarantee the format of every record value will be the same. This is very
> > loosely related to point 1. above.
> >
> > 3.
> > You provide a ProcessorContext to both exception handlers, but state they
> > cannot be used to forward records. In that case, I believe you should use
> > ProcessingContext instead, which statically guarantees that it can't be
> > used to forward records.
> >
> > 4.
> > You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan
> > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> >
> > Regards,
> >
> > Nick
> >
> > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina 
> > wrote:
> >
> >> In a general way, if the user does not configure the right ACL, that
> >> would be a security issue, 

Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
In a general way, if the user does not configure the right ACL, that
would be a security issue, but that's true for any topic.

This KIP allows users to configure a Dead Letter Queue without writing
custom Java code in Kafka Streams, not at the topic level.
A lot of applications are already implementing this pattern, but the
required code to do it is quite painful and error prone, for example
most apps I have seen created a new KafkaProducer to send records to
their DLQ.

As it would be disabled by default for backward compatibility, I doubt
it would generate any security concern.
If a user explicitly configures a Deal Letter Queue, it would be up to
him to configure the relevant ACLs to ensure that the right principal
can access it.
It is already the case for all internal, input and output Kafka
Streams topics (e.g. repartition, changelog topics) that also could
contain confidential data, so I do not think we should implement a
different behavior for this one.

In this KIP, we configured the default DLQ record to have the initial
record key/value as we assume that it is the expected and wanted
behavior for most applications.
If a user does not want to have the key/value in the DLQ record for
any reason, they could still implement exception handlers to build
their own DLQ record.

Regarding ACL, maybe something smarter could be done in Kafka Streams,
but this is out of scope for this KIP.

On Fri, 12 Apr 2024 at 11:58, Claude Warren  wrote:
>
> My concern is that someone would create a dead letter queue on a sensitive
> topic and not get the ACL correct from the start.  Thus causing potential
> confidential data leak.  Is there anything in the proposal that would
> prevent that from happening?  If so I did not recognize it as such.
>
> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina 
> wrote:
>
> > Hi Claude,
> >
> > In  this KIP, the Dead Letter Queue is materialized by a standard and
> > independant topic, thus normal ACL applies to it like any other topic.
> > This should not introduce any security issues, obviously, the right
> > ACL would need to be provided to write to the DLQ if configured.
> >
> > Cheers,
> > Damien
> >
> > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> >  wrote:
> > >
> > > I am new to the Kafka codebase so please excuse any ignorance on my part.
> > >
> > > When a dead letter queue is established is there a process to ensure that
> > > it at least is defined with the same ACL as the original queue?  Without
> > > such a guarantee at the start it seems that managing dead letter queues
> > > will be fraught with security issues.
> > >
> > >
> > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina  > >
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > To continue on our effort to improve Kafka Streams error handling, we
> > > > propose a new KIP to add out of the box support for Dead Letter Queue.
> > > > The goal of this KIP is to provide a default implementation that
> > > > should be suitable for most applications and allow users to override
> > > > it if they have specific requirements.
> > > >
> > > > In order to build a suitable payload, some additional changes are
> > > > included in this KIP:
> > > >   1. extend the ProcessingContext to hold, when available, the source
> > > > node raw key/value byte[]
> > > >   2. expose the ProcessingContext to the ProductionExceptionHandler,
> > > > it is currently not available in the handle parameters.
> > > >
> > > > Regarding point 2.,  to expose the ProcessingContext to the
> > > > ProductionExceptionHandler, we considered two choices:
> > > >   1. exposing the ProcessingContext as a parameter in the handle()
> > > > method. That's the cleanest way IMHO, but we would need to deprecate
> > > > the old method.
> > > >   2. exposing the ProcessingContext as an attribute in the interface.
> > > > This way, no method is deprecated, but we would not be consistent with
> > > > the other ExceptionHandler.
> > > >
> > > > In the KIP, we chose the 1. solution (new handle signature with old
> > > > one deprecated), but we could use other opinions on this part.
> > > > More information is available directly on the KIP.
> > > >
> > > > KIP link:
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> > > >
> > > > Feedbacks and suggestions are welcome,
> > > >
> > > > Cheers,
> > > > Damien, Sebastien and Loic
> > > >
> >
>
>
> --
> LinkedIn: http://www.linkedin.com/in/claudewarren


Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-12 Thread Damien Gasparina
Hi Claude,

In  this KIP, the Dead Letter Queue is materialized by a standard and
independant topic, thus normal ACL applies to it like any other topic.
This should not introduce any security issues, obviously, the right
ACL would need to be provided to write to the DLQ if configured.

Cheers,
Damien

On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
 wrote:
>
> I am new to the Kafka codebase so please excuse any ignorance on my part.
>
> When a dead letter queue is established is there a process to ensure that
> it at least is defined with the same ACL as the original queue?  Without
> such a guarantee at the start it seems that managing dead letter queues
> will be fraught with security issues.
>
>
> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina 
> wrote:
>
> > Hi everyone,
> >
> > To continue on our effort to improve Kafka Streams error handling, we
> > propose a new KIP to add out of the box support for Dead Letter Queue.
> > The goal of this KIP is to provide a default implementation that
> > should be suitable for most applications and allow users to override
> > it if they have specific requirements.
> >
> > In order to build a suitable payload, some additional changes are
> > included in this KIP:
> >   1. extend the ProcessingContext to hold, when available, the source
> > node raw key/value byte[]
> >   2. expose the ProcessingContext to the ProductionExceptionHandler,
> > it is currently not available in the handle parameters.
> >
> > Regarding point 2.,  to expose the ProcessingContext to the
> > ProductionExceptionHandler, we considered two choices:
> >   1. exposing the ProcessingContext as a parameter in the handle()
> > method. That's the cleanest way IMHO, but we would need to deprecate
> > the old method.
> >   2. exposing the ProcessingContext as an attribute in the interface.
> > This way, no method is deprecated, but we would not be consistent with
> > the other ExceptionHandler.
> >
> > In the KIP, we chose the 1. solution (new handle signature with old
> > one deprecated), but we could use other opinions on this part.
> > More information is available directly on the KIP.
> >
> > KIP link:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> >
> > Feedbacks and suggestions are welcome,
> >
> > Cheers,
> > Damien, Sebastien and Loic
> >


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-11 Thread Damien Gasparina
ly) and if we
> > don't do anything special for the DSL I am ok with moving forward with
> > this KIP as-is, but we should be aware of potential limitations for DSL
> > users. We can always do a follow up KIP to close gaps when we understand
> > the impact better -- covering the DSL would also expand the scope of
> > this KIP significantly...
> >
> > About the metric: just to double check. Do we think it's worth to add a
> > new metric? Or could we re-use the existing "dropped record metric"?
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/10/24 5:11 AM, Sebastien Viale wrote:
> >> Hi,
> >>
> >> You are right, it will simplify types.
> >>
> >> We update the KIP
> >>
> >> regards
> >>
> >> Sébastien *VIALE***
> >>
> >> *MICHELIN GROUP* - InfORMATION Technology
> >> *Technical Expert Kafka*
> >>
> >>   Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand
> >>
> >> 
> >> *De :* Bruno Cadonna 
> >> *Envoyé :* mercredi 10 avril 2024 10:38
> >> *À :* dev@kafka.apache.org 
> >> *Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
> >> handler for exceptions occuring during processing
> >> Warning External sender Do not click on any links or open any
> >> attachments unless you trust the sender and know the content is safe.
> >>
> >> Hi Loïc, Damien, and Sébastien,
> >>
> >> Great that we are converging!
> >>
> >>
> >> 3.
> >> Damien and Loïc, I think in your examples the handler will receive
> >> Record because an Record is passed to
> >> the processor in the following code line:
> >> https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
> >>  
> >> <https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152>
> >>
> >> I see that we do not need to pass into the the handler a Record >> byte[]> just because we do that for the DeserializationExceptionHandler
> >> and the ProductionExceptionHandler. When those two handlers are called,
> >> the record is already serialized. This is not the case for the
> >> ProcessingExceptionHandler. However, I would propose to use Record
> >> for the record that is passed to the ProcessingExceptionHandler because
> >> it makes the handler API more flexible.
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >> This email was screened for spam and malicious content but exercise
> >> caution anyway.
> >>
> >>
> >>
> >>
> >> On 4/9/24 9:09 PM, Loic Greffier wrote:
> >>  > Hi Bruno and Bill,
> >>  >
> >>  > To complete the Damien's purposes about the point 3.
> >>  >
> >>  > Processing errors are caught and handled by the
> >> ProcessingErrorHandler, at the precise moment when records are
> >> processed by processor nodes. The handling will be performed in the
> >> "process" method of the ProcessorNode, such as:
> >>  >
> >>  > public void process(final Record record) {
> >>  > ...
> >>  >
> >>  > try {
> >>  > ...
> >>  > } catch (final ClassCastException e) {
> >>  > ...
> >>  > } catch (Exception e) {
> >>  > ProcessingExceptionHandler.ProcessingHandlerResponse response =
> >> this.processingExceptionHandler
> >>  > .handle(internalProcessorContext, (Record) record, e);
> >>  >
> >>  > if (response ==
> >> ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
> >>  > throw new StreamsException("Processing exception handler is set to
> >> fail upon" +
> >>  > " a processing error. If you would rather have the streaming
> >> pipeline" +
> >>  > " continue after a processing error, please set the " +
> >>  > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
> >>  > e);
> >>  > }
> >>  > }
> >>  > }
> >>  > As you can see, the record is transmitted to the
> >> ProcessingExceptionHandler as a Record, as we are
> >> dealing with the inp

[jira] [Created] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-16505:


 Summary: KIP-1034: Dead letter queue in Kafka Streams
 Key: KAFKA-16505
 URL: https://issues.apache.org/jira/browse/KAFKA-16505
 Project: Kafka
  Issue Type: Improvement
Reporter: Damien Gasparina


See KIP: KIP-1034: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Damien Gasparina
Hi everyone,

To continue on our effort to improve Kafka Streams error handling, we
propose a new KIP to add out of the box support for Dead Letter Queue.
The goal of this KIP is to provide a default implementation that
should be suitable for most applications and allow users to override
it if they have specific requirements.

In order to build a suitable payload, some additional changes are
included in this KIP:
  1. extend the ProcessingContext to hold, when available, the source
node raw key/value byte[]
  2. expose the ProcessingContext to the ProductionExceptionHandler,
it is currently not available in the handle parameters.

Regarding point 2.,  to expose the ProcessingContext to the
ProductionExceptionHandler, we considered two choices:
  1. exposing the ProcessingContext as a parameter in the handle()
method. That's the cleanest way IMHO, but we would need to deprecate
the old method.
  2. exposing the ProcessingContext as an attribute in the interface.
This way, no method is deprecated, but we would not be consistent with
the other ExceptionHandler.

In the KIP, we chose the 1. solution (new handle signature with old
one deprecated), but we could use other opinions on this part.
More information is available directly on the KIP.

KIP link: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams

Feedbacks and suggestions are welcome,

Cheers,
Damien, Sebastien and Loic


Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Damien Gasparina
> > >> where
> > >> the record is actually forwarded to, and could cause confusion if
> users
> > >> aren't aware that the handler is effectively calling from the context
> > >> of the
> > >> processor that threw the exception.
> > >> 2a. If you don't explicitly want the ability to forward records, I
> would
> > >> suggest changing the type of this parameter to ProcessingContext,
> which
> > >> has all the metadata and useful info of the ProcessorContext but
> without
> > >> the
> > >> forwarding APIs. This would also lets us sidestep the following issue:
> > >> 2b. If you *do* want the ability to forward records, setting aside
> > >> whether
> > >> that
> > >> in of itself makes sense to do, we would need to pass in either a
> > regular
> > >> ProcessorContext or a FixedKeyProcessorContext, depending on what kind
> > >> of processor it is. I'm not quite sure how we could design a clean API
> > >> here,
> > >> so I'll hold off until you clarify whether you even want forwarding or
> > >> not.
> > >> We would also need to split the input record into a Record vs
> > >> FixedKeyRecord
> > >>
> > >> 3. One notable difference between this handler and the existing ones
> you
> > >> pointed out, the Deserialization/ProductionExceptionHandler, is that
> the
> > >> records passed in to those are in serialized bytes, whereas the record
> > >> here would be POJOs. You account for this by making the parameter
> > >> type a Record, but I just wonder how users would be
> > >> able to read the key/value and figure out what type it should be. For
> > >> example, would they need to maintain a map from processor name to
> > >> input record types?
> > >>
> > >> If you could provide an example of this new feature in the KIP, it
> > >> would be
> > >> very helpful in understanding whether we could do something to make it
> > >> easier for users to use, for if it would be fine as-is
> > >>
> > >> 4. We should include all the relevant info for a new metric, such as
> the
> > >> metric
> > >> group and recording level. You can look at other metrics KIPs like
> > >> KIP-444
> > >> and KIP-613 for an example. I suspect you intend for this to be in the
> > >> processor group and at the INFO level?
> > >>
> > >> Hope that all makes sense! Thanks again for the KIP
> > >>
> > >> -Sophie
> > >>
> > >> On Fri, Mar 29, 2024 at 6:16 AM Damien Gasparina <
> d.gaspar...@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> Hi everyone,
> > >>>
> > >>> After writing quite a few Kafka Streams applications, me and my
> > >>> colleagues
> > >>> just created KIP-1033 to introduce a new Exception Handler in Kafka
> > >>> Streams
> > >>> to simplify error handling.
> > >>> This feature would allow defining an exception handler to
> automatically
> > >>> catch exceptions occurring during the processing of a message.
> > >>>
> > >>> KIP link:
> > >>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing
> > >
> > >>>
> > >>> Feedbacks and suggestions are welcome,
> > >>>
> > >>> Cheers,
> > >>> Damien, Sebastien and Loic
> > >>>
> > >>
> > >> This email was screened for spam and malicious content but exercise
> > >> caution anyway.
> > >>
> > >>
> >
>


[jira] [Created] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-03-29 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-16448:


 Summary: Add Kafka Streams exception handler for exceptions 
occuring during processing (KIP-1033)
 Key: KAFKA-16448
 URL: https://issues.apache.org/jira/browse/KAFKA-16448
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Damien Gasparina


Jira to follow work on KIP: 
h1. [KIP-1033: Add Kafka Streams exception handler for exceptions occuring 
during 
processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-03-29 Thread Damien Gasparina
Hi everyone,

After writing quite a few Kafka Streams applications, me and my colleagues
just created KIP-1033 to introduce a new Exception Handler in Kafka Streams
to simplify error handling.
This feature would allow defining an exception handler to automatically
catch exceptions occurring during the processing of a message.

KIP link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing

Feedbacks and suggestions are welcome,

Cheers,
Damien, Sebastien and Loic


Requesting permissions to contribute to Apache Kafka.

2024-03-01 Thread Damien Gasparina
Hi team,

I would like permission to contribute to Kafka.
My wiki ID is "d.gasparina" and my Jira ID is "Dabz".

I would like to propose a KIP to improve Kafka Streams error and exception
handling.

Cheers,
Damien


[jira] [Created] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied

2022-10-14 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-14302:


 Summary: Infinite probing rebalance if a changelog topic got 
emptied
 Key: KAFKA-14302
 URL: https://issues.apache.org/jira/browse/KAFKA-14302
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Damien Gasparina
 Attachments: image-2022-10-14-12-04-01-190.png

If a store, with a changelog topic, has been fully emptied, it could generate 
infinite probing rebalance.

 

The scenario is the following:
 * A Kafka Streams application have a store with a changelog
 * Many entries are pushed into the changelog, thus the Log end Offset is high, 
let's say 20,000
 * Then, the store got emptied, either due to data retention (windowing) or 
tombstone
 * Then an instance of the application is restarted
 * It restores the store from the changelog, but does not write a checkpoint 
file as there are no data pushed at all
 * As there are no checkpoint entries, this instance specify a taskOffsetSums 
with offset set to 0 in the subscriptionUserData
 * The group leader, during the assignment, then compute a lag of 20,000 (end 
offsets - task offset), which is greater than the default acceptable lag, thus 
decide to schedule a probing rebalance
 * In ther next probing rebalance, nothing changed, so... new probing rebalance

 

I was able to reproduce locally with a simple topology:

 
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
 

 

 

Due to this issue, application having an empty changelog are experiencing 
frequent rebalance:

!image-2022-10-14-12-04-01-190.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while

2022-02-01 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-13636:


 Summary: Committed offsets could be deleted during a rebalance if 
a group did not commit for a while
 Key: KAFKA-13636
 URL: https://issues.apache.org/jira/browse/KAFKA-13636
 Project: Kafka
  Issue Type: Bug
  Components: core, offset manager
Affects Versions: 3.0.0, 2.8.1, 2.7.2, 2.6.2, 2.5.1, 2.4.0
Reporter: Damien Gasparina


The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=2
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13109) WorkerSourceTask is not enforcing the errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a RetriableException during task.poll()

2021-07-20 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-13109:


 Summary: WorkerSourceTask is not enforcing the 
errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a 
RetriableException during task.poll()
 Key: KAFKA-13109
 URL: https://issues.apache.org/jira/browse/KAFKA-13109
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.8.0
Reporter: Damien Gasparina


It seems that the {{errors.retry.timeout}} timeout is not enforced if 
{{RetriableException}} is thrown in the {{poll()}} of a SourceTask.

Looking at Kafka Connect source code:
 * If a task throws a {{RetriableException}} during a {{poll()}}, the connect 
runtime catch it and returns null: 
[https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L273-L277]
 * Then, {{toSend}} is set to null, and the runtime continues the loop and 
re-execute the next iteration of poll without any delay 
[https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L240-L246]

 

This implies that, if the {{poll()}} is throwing a {{RetriableException}}:
 * {{errors.retry.timeout}} is ignored and the task will retry indefinitely
 * there would be no delay between each retry, {{errors.retry.delay.max.ms}} is 
ignored, causing potential high resource utilization and log flooding

 

My understanding of 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
 is that {{errors.retry.timeout}} and {{errors.retry.delay.max.ms}} should have 
been respected in case of a {{RetriableException}} during a Source Task 
{{poll()}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13024) Kafka Streams is dropping messages with null key during repartition

2021-07-01 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-13024:


 Summary: Kafka Streams is dropping messages with null key during 
repartition
 Key: KAFKA-13024
 URL: https://issues.apache.org/jira/browse/KAFKA-13024
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.1, 2.8.0
Reporter: Damien Gasparina


{{KStream.repartition}} is silently filtering messages with null keys. A single 
topology like {{.stream().repartition().to()}} would filter all messages with 
null key.

We are adding a filtering before the repartition 
([https://github.com/apache/kafka/blob/2.8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L1060-L1064]).
 It looks like we are doing that because this method is also used for building 
KTable.

Null key messages are valid for a KStream, it looks like a regression, the 
previous {{.through()}} was not filtering null key messages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-15 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-12951:


 Summary: Infinite loop while restoring a GlobalKTable
 Key: KAFKA-12951
 URL: https://issues.apache.org/jira/browse/KAFKA-12951
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Damien Gasparina


We encountered an issue a few time in some of our Kafka Streams application.
 After an unexpected restart of our application, some instances have not been 
able to resume operating.

They got stuck while trying to restore the state store of a GlobalKTable. The 
only way to resume operating was to manually delete their `state.dir`.

We observed the following timeline:
 * After the restart of the Kafka Streams application, it tries to restore its 
GlobalKTable
 * It seeks to the last checkpoint available on the {{state.dir}}: 382 
([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
 * The watermark ({{endOffset}} results) returned the offset 383 
{code:java}
handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
-1{code}

 * We enter the loop: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
 * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
 and we crash (x)
{code:java}
Global task did not make progress to restore state within 30 ms.{code}

 * The POD restart, and we encounter the same issue until we manually delete 
the {{state.dir}}

 

Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
 * {{Offset 381}} - Last business message received
 * {{Offset 382}} - Txn COMMIT (last message)

I think the real culprit is that the checkpoint is {{383}} instead of being 
{{382}}. For information, this is a compacted topic, and just before the 
outage, we encountered some ISR shrinking and leader changes.

While experimenting with the API, it seems that the {{consumer.position()}} 
call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
{{position()}} is actually returning the seek position. After the {{poll()}} 
call, even if no data is returned, the {{position()}} is returning the LSO. I 
did an example on 
[https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12272) Kafka Streams metric commit-latency-max and commit-latency-avg is always 0

2021-02-02 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-12272:


 Summary: Kafka Streams metric commit-latency-max and 
commit-latency-avg is always 0
 Key: KAFKA-12272
 URL: https://issues.apache.org/jira/browse/KAFKA-12272
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.1, 2.7.0
Reporter: Damien Gasparina
 Attachments: KS-2.6.0.png, KS-2.7.0.png

After upgrading to Kafka Streams 2.7.0, the JMX metrics commit-latency-max and 
commit-latency-avg is always equal to 0.


For the same application, with Kafka Streams 2.6.0 and bellow, I can observe:
 !KS-2.6.0.png! 


With Kafka Streams 2.7.0:
 !KS-2.7.0.png! 


By quickly looking at the issue, I got the feeling it's a drawback from: 
https://github.com/apache/kafka/pull/9634.

We are setting _now_ to the current Time in the _maybeCommit()_ function: 
https://github.com/apache/kafka/blob/2.7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L930.
 

And just after we do a _Time.millisecond() - now_ (that we just updated) to 
compute the latency: 
https://github.com/apache/kafka/blob/2.7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L692



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-7129) Dynamic default value for number of thread configuration

2018-07-03 Thread Damien Gasparina (JIRA)
Damien Gasparina created KAFKA-7129:
---

 Summary: Dynamic default value for number of thread configuration
 Key: KAFKA-7129
 URL: https://issues.apache.org/jira/browse/KAFKA-7129
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Damien Gasparina


There are properties in the broker to change the number of thread of a 
component (e.g. _num.replica.fetchers_ or _num.network.threads_). After 
discussing with [~astubbs], it seems that the default values are optimized for 
an 8 CPU machine and might not be optimized for larger machine (e.g. 48 cores). 

For those larger machine, an admin need to tune them to be able to use all 
resources of the host.

Having dynamic default value (e.g. _num.replica.fetchers = ceil(number of core 
/ 8), etc...) instead of static (e.g. _num.replica.fetchers =1) could be a more 
efficient strategy to have default values optimized for different kind of 
deployment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)