Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
;> > >>> We just updated KIP-1034, we changed the following: > >>> - We included the ProcessingExceptionHandler (KIP-1033) directly in > >>> the KIP; > >>> - We provided examples to clarify the new configuration, and how it > >>> could be leveraged. > >>> > >>> I think we can resume the conversation on this KIP. > >>> > >>> Cheers, > >>> Damien Sebastien and Loic > >>> > >>> > >>> On Tue, 27 Aug 2024 at 15:37, Sebastien Viale > >>> wrote: > >>>> > >>>> > >>>> Hi Bruno, > >>>> > >>>> We have planned a meeting for next friday to discuss it with Loic and > >>>> Damien. > >>>> > >>>> We will be able to restart discussions about it soon. > >>>> > >>>> regards > >>>> > >>>> > >>>> De : Bruno Cadonna > >>>> Envoyé : lundi 26 août 2024 11:32 > >>>> À : dev@kafka.apache.org > >>>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >>>> > >>>> Warning External sender Do not click on any links or open any > >>>> attachments unless you trust the sender and know the content is safe. > >>>> > >>>> Hi Loïc, Sebastien, and Damien, > >>>> > >>>> Now that KIP-1033 is going to be released in 3.9, what is the plan to > >>>> progress with this KIP? > >>>> > >>>> Is the KIP up-to-date, so that we can restart discussion? > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> This email was screened for spam and malicious content but exercise > >>>> caution anyway. > >>>> > >>>> > >>>> > >>>> > >>>> On 6/13/24 6:16 PM, Damien Gasparina wrote: > >>>>> Hi Bruno, > >>>>> > >>>>> We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > >>>>> that's why not much progress has been made on this one yet. > >>>>> Regarding your points: > >>>>> > >>>>> B1: It is up to the user to specify the DLQ topic name and to > >>>>> implement a potential differentiation. I tend to think that having one > >>>>> DLQ per application ID is the wisest, but I encountered cases and > >>>>> applications that preferred having a shared DLQ topic between multiple > >>>>> applications, e.g. to reduce the number of partitions, or to ease > >>>>> monitoring > >>>>> > >>>>> B2 : Goot catch, it should be > >>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" > >>>>> prefix during the discussion, looks like I forgot to update all > >>>>> occurrences in the KIP. > >>>>> > >>>>> B3 :The trigger for sending to the DLQ would be if > >>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user > >>>>> implemented a custom exception handler that returns DLQ records. > >>>>> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the > >>>>> behavior of the default handler, thus custom exception handlers will > >>>>> completely ignore this parameter. > >>>>> > >>>>> I think it's a good trade-off between providing a production-ready > >>>>> default implementation, yet providing sufficient flexibility for > >>>>> complex use-cases. > >>>>> This behavior definitely needs to be documented, but I guess it's safe > >>>>> to push the responsibility of the DLQ records to the user if they > >>>>> implement custom handlers. > >>>>> > >>>>> Cheers, > >>>>> Damien > >>>>> > >>>>> > >>>>> On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna wrote: > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> since there was not too much activity in this thread recently, I was > >>>>>> wondering what the status of this discussion is. > >>>>>> > >>>>>> I cannot f
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Damien, BC1 > In my opinion DLQ topics should be viewed as a sink topic and > AFAIK, this tool does not clean up sink topics. Maybe, but one could also argue DLQ topics are part of the runtime because is collects errors occurred during the runtime that might be specific to given execution of the Streams app. One might want want to reset the errors when starting from scratch. BC2 You are right the configs are passed at creation time. My bad! BC3 I think the `with`-syntax fits well when building objects that contain kind of configs like Materialized that you usually pass into an API. However, the handler returns the response. The response instructs something. It says CONTINUE processing or FAIL the processing. With your KIP the response gets an additional instruction, namely `andAddToDeadLetterQueue`. I would not sacrifice better readability for consistency in this case. Best, Bruno On 9/3/24 3:18 PM, Damien Gasparina wrote: Hi Bruno, Thanks a lot for your comments! BC1 Do you also plan to add an argument to the StreamsResetter tool to delete the default DLQ topic when a Streams app is reset? Good point, I did not think about the StreamsResetter tool. Thinking out loud, I am not sure if it is a good idea to add an option to clean them up. In my opinion DLQ topics should be viewed as a sink topic and AFAIK, this tool does not clean up sink topics. BC2 In case of a custom exception handlers, they can get the errors.deadletterqueue.topic.name configuration by overriding `void configure(Map configs);`. As it is the location where all the configuration can be accessed, I think it's the best way no? I will think about it a bit further, it might be useful/convenient for users. BC3 The "with" syntax is used in many locations in the Kafka Streams public classes, that's why I used it, e.g. Materialized.with(), Consumed.withKeySerde(...).withValueSerde(...), Grouped.withName(...). I do agree that .andAddToDeadLetterQueue is more intuitive, but I would argue that being consistent is better in this situation. What do you think? Cheers, Damien On Tue, 3 Sept 2024 at 14:59, Bruno Cadonna wrote: Hi, Thanks for the updates! I have a couple of comments. BC1 Do you also plan to add an argument to the StreamsResetter tool to delete the default DLQ topic when a Streams app is reset? BC2 Would it make sense to add errors.deadletterqueue.topic.name to the ErrorHandlerContext in case that a custom exception handler wants to write to the configured DLQ topic? For example if only one of the handler needs to be customized but all handlers should write to the configured DLQ topic. BC3 What do you think about renaming withDeadLetterQueueRecords() to andAddToDeadLetterQueue()? A customized handler would look like public ProcessingHandlerResponse handle( final ErrorHandlerContext context, final Record record, final Exception exception ) { return ProcessingHandlerResponse.CONTINUE .andAddToDeadLetterQueue( Collections.singletonList(new ProducerRecord<>( "app-dlq", "Hello".getBytes(StandardCharsets.UTF_8), "World".getBytes(StandardCharsets.UTF_8) )) ); } I think the code becomes more readable. Best, Bruno On 8/30/24 3:37 PM, Damien Gasparina wrote: Hi everyone, We just updated KIP-1034, we changed the following: - We included the ProcessingExceptionHandler (KIP-1033) directly in the KIP; - We provided examples to clarify the new configuration, and how it could be leveraged. I think we can resume the conversation on this KIP. Cheers, Damien Sebastien and Loic On Tue, 27 Aug 2024 at 15:37, Sebastien Viale wrote: Hi Bruno, We have planned a meeting for next friday to discuss it with Loic and Damien. We will be able to restart discussions about it soon. regards De : Bruno Cadonna Envoyé : lundi 26 août 2024 11:32 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Sebastien, and Damien, Now that KIP-1033 is going to be released in 3.9, what is the plan to progress with this KIP? Is the KIP up-to-date, so that we can restart discussion? Best, Bruno This email was screened for spam and malicious content but exercise caution anyway. On 6/13/24 6:16 PM, Damien Gasparina wrote: Hi Bruno, We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, that's why not much progress has been made on this one yet. Regarding your points: B1: It is up to the user to specify the DLQ topic name and to implement a potential differentiation. I tend to think that having one DLQ per application ID is the wisest, but I encountered cases and applications that pre
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Bruno, Thanks a lot for your comments! > BC1 > Do you also plan to add an argument to the StreamsResetter tool to > delete the default DLQ topic when a Streams app is reset? Good point, I did not think about the StreamsResetter tool. Thinking out loud, I am not sure if it is a good idea to add an option to clean them up. In my opinion DLQ topics should be viewed as a sink topic and AFAIK, this tool does not clean up sink topics. > BC2 In case of a custom exception handlers, they can get the errors.deadletterqueue.topic.name configuration by overriding `void configure(Map configs);`. As it is the location where all the configuration can be accessed, I think it's the best way no? I will think about it a bit further, it might be useful/convenient for users. > BC3 The "with" syntax is used in many locations in the Kafka Streams public classes, that's why I used it, e.g. Materialized.with(), Consumed.withKeySerde(...).withValueSerde(...), Grouped.withName(...). I do agree that .andAddToDeadLetterQueue is more intuitive, but I would argue that being consistent is better in this situation. What do you think? Cheers, Damien On Tue, 3 Sept 2024 at 14:59, Bruno Cadonna wrote: > > Hi, > > Thanks for the updates! > > I have a couple of comments. > > BC1 > Do you also plan to add an argument to the StreamsResetter tool to > delete the default DLQ topic when a Streams app is reset? > > > BC2 > Would it make sense to add errors.deadletterqueue.topic.name to the > ErrorHandlerContext in case that a custom exception handler wants to > write to the configured DLQ topic? > For example if only one of the handler needs to be customized but all > handlers should write to the configured DLQ topic. > > > BC3 > What do you think about renaming withDeadLetterQueueRecords() to > andAddToDeadLetterQueue()? > A customized handler would look like > > public ProcessingHandlerResponse handle( > final ErrorHandlerContext context, > final Record record, > final Exception exception > ) { > return ProcessingHandlerResponse.CONTINUE > .andAddToDeadLetterQueue( > Collections.singletonList(new ProducerRecord<>( > "app-dlq", > "Hello".getBytes(StandardCharsets.UTF_8), > "World".getBytes(StandardCharsets.UTF_8) > )) > ); > } > > I think the code becomes more readable. > > > Best, > Bruno > > On 8/30/24 3:37 PM, Damien Gasparina wrote: > > Hi everyone, > > > > We just updated KIP-1034, we changed the following: > >- We included the ProcessingExceptionHandler (KIP-1033) directly in the > > KIP; > >- We provided examples to clarify the new configuration, and how it > > could be leveraged. > > > > I think we can resume the conversation on this KIP. > > > > Cheers, > > Damien Sebastien and Loic > > > > > > On Tue, 27 Aug 2024 at 15:37, Sebastien Viale > > wrote: > >> > >> > >> Hi Bruno, > >> > >> We have planned a meeting for next friday to discuss it with Loic and > >> Damien. > >> > >> We will be able to restart discussions about it soon. > >> > >> regards > >> > >> > >> De : Bruno Cadonna > >> Envoyé : lundi 26 août 2024 11:32 > >> À : dev@kafka.apache.org > >> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > >> > >> Warning External sender Do not click on any links or open any attachments > >> unless you trust the sender and know the content is safe. > >> > >> Hi Loïc, Sebastien, and Damien, > >> > >> Now that KIP-1033 is going to be released in 3.9, what is the plan to > >> progress with this KIP? > >> > >> Is the KIP up-to-date, so that we can restart discussion? > >> > >> Best, > >> Bruno > >> > >> This email was screened for spam and malicious content but exercise > >> caution anyway. > >> > >> > >> > >> > >> On 6/13/24 6:16 PM, Damien Gasparina wrote: > >>> Hi Bruno, > >>> > >>> We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > >>> that's why not much progress has been made on this one yet. > >>> Regarding your points: > >>> > >>> B1: It is up to the user to specify the DLQ topic name and to > >>> implement a potential differentiation. I tend to think that having one > >>> DLQ per ap
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi, Thanks for the updates! I have a couple of comments. BC1 Do you also plan to add an argument to the StreamsResetter tool to delete the default DLQ topic when a Streams app is reset? BC2 Would it make sense to add errors.deadletterqueue.topic.name to the ErrorHandlerContext in case that a custom exception handler wants to write to the configured DLQ topic? For example if only one of the handler needs to be customized but all handlers should write to the configured DLQ topic. BC3 What do you think about renaming withDeadLetterQueueRecords() to andAddToDeadLetterQueue()? A customized handler would look like public ProcessingHandlerResponse handle( final ErrorHandlerContext context, final Record record, final Exception exception ) { return ProcessingHandlerResponse.CONTINUE .andAddToDeadLetterQueue( Collections.singletonList(new ProducerRecord<>( "app-dlq", "Hello".getBytes(StandardCharsets.UTF_8), "World".getBytes(StandardCharsets.UTF_8) )) ); } I think the code becomes more readable. Best, Bruno On 8/30/24 3:37 PM, Damien Gasparina wrote: Hi everyone, We just updated KIP-1034, we changed the following: - We included the ProcessingExceptionHandler (KIP-1033) directly in the KIP; - We provided examples to clarify the new configuration, and how it could be leveraged. I think we can resume the conversation on this KIP. Cheers, Damien Sebastien and Loic On Tue, 27 Aug 2024 at 15:37, Sebastien Viale wrote: Hi Bruno, We have planned a meeting for next friday to discuss it with Loic and Damien. We will be able to restart discussions about it soon. regards De : Bruno Cadonna Envoyé : lundi 26 août 2024 11:32 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Sebastien, and Damien, Now that KIP-1033 is going to be released in 3.9, what is the plan to progress with this KIP? Is the KIP up-to-date, so that we can restart discussion? Best, Bruno This email was screened for spam and malicious content but exercise caution anyway. On 6/13/24 6:16 PM, Damien Gasparina wrote: Hi Bruno, We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, that's why not much progress has been made on this one yet. Regarding your points: B1: It is up to the user to specify the DLQ topic name and to implement a potential differentiation. I tend to think that having one DLQ per application ID is the wisest, but I encountered cases and applications that preferred having a shared DLQ topic between multiple applications, e.g. to reduce the number of partitions, or to ease monitoring B2 : Goot catch, it should be ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" prefix during the discussion, looks like I forgot to update all occurrences in the KIP. B3 :The trigger for sending to the DLQ would be if ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user implemented a custom exception handler that returns DLQ records. ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the behavior of the default handler, thus custom exception handlers will completely ignore this parameter. I think it's a good trade-off between providing a production-ready default implementation, yet providing sufficient flexibility for complex use-cases. This behavior definitely needs to be documented, but I guess it's safe to push the responsibility of the DLQ records to the user if they implement custom handlers. Cheers, Damien On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna wrote: Hi, since there was not too much activity in this thread recently, I was wondering what the status of this discussion is. I cannot find the examples in the KIP Sébastien mentioned in the last message to this thread. I can also not find the corresponding definition of the following method call in the KIP: FAIL.withDeadLetterQueueRecord(record, "dlq-topic") I have also some comments: B1 Did you consider to prefix the dead letter queue topic names with the application ID to distinguish the topics between Streams apps? Or is the user responsible for the differentiation? If the user is responsible, we risk that faulty records of different Streams apps end up in the same dead letter queue. B2 Is the name of the dead letter queue topic config DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. B3 What is exactly the trigger to send a record to the dead letter queue? Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a record to the return value of the exception handler? What happens if I set ERRORS_DEADLETTERQUE
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi everyone, We just updated KIP-1034, we changed the following: - We included the ProcessingExceptionHandler (KIP-1033) directly in the KIP; - We provided examples to clarify the new configuration, and how it could be leveraged. I think we can resume the conversation on this KIP. Cheers, Damien Sebastien and Loic On Tue, 27 Aug 2024 at 15:37, Sebastien Viale wrote: > > > Hi Bruno, > > We have planned a meeting for next friday to discuss it with Loic and Damien. > > We will be able to restart discussions about it soon. > > regards > > > De : Bruno Cadonna > Envoyé : lundi 26 août 2024 11:32 > À : dev@kafka.apache.org > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > > Warning External sender Do not click on any links or open any attachments > unless you trust the sender and know the content is safe. > > Hi Loïc, Sebastien, and Damien, > > Now that KIP-1033 is going to be released in 3.9, what is the plan to > progress with this KIP? > > Is the KIP up-to-date, so that we can restart discussion? > > Best, > Bruno > > This email was screened for spam and malicious content but exercise caution > anyway. > > > > > On 6/13/24 6:16 PM, Damien Gasparina wrote: > > Hi Bruno, > > > > We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > > that's why not much progress has been made on this one yet. > > Regarding your points: > > > > B1: It is up to the user to specify the DLQ topic name and to > > implement a potential differentiation. I tend to think that having one > > DLQ per application ID is the wisest, but I encountered cases and > > applications that preferred having a shared DLQ topic between multiple > > applications, e.g. to reduce the number of partitions, or to ease > > monitoring > > > > B2 : Goot catch, it should be > > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" > > prefix during the discussion, looks like I forgot to update all > > occurrences in the KIP. > > > > B3 :The trigger for sending to the DLQ would be if > > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user > > implemented a custom exception handler that returns DLQ records. > > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the > > behavior of the default handler, thus custom exception handlers will > > completely ignore this parameter. > > > > I think it's a good trade-off between providing a production-ready > > default implementation, yet providing sufficient flexibility for > > complex use-cases. > > This behavior definitely needs to be documented, but I guess it's safe > > to push the responsibility of the DLQ records to the user if they > > implement custom handlers. > > > > Cheers, > > Damien > > > > > > On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna wrote: > >> > >> Hi, > >> > >> since there was not too much activity in this thread recently, I was > >> wondering what the status of this discussion is. > >> > >> I cannot find the examples in the KIP Sébastien mentioned in the last > >> message to this thread. I can also not find the corresponding definition > >> of the following method call in the KIP: > >> > >> FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > >> > >> I have also some comments: > >> > >> B1 > >> Did you consider to prefix the dead letter queue topic names with the > >> application ID to distinguish the topics between Streams apps? Or is the > >> user responsible for the differentiation? If the user is responsible, we > >> risk that faulty records of different Streams apps end up in the same > >> dead letter queue. > >> > >> B2 > >> Is the name of the dead letter queue topic config > >> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or > >> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. > >> > >> B3 > >> What is exactly the trigger to send a record to the dead letter queue? > >> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a > >> record to the return value of the exception handler? > >> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do > >> not add a record to the return value of the handler? What happens if I > >> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to > >> the return value of the handler? > >>
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Bruno, We have planned a meeting for next friday to discuss it with Loic and Damien. We will be able to restart discussions about it soon. regards De : Bruno Cadonna Envoyé : lundi 26 août 2024 11:32 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Loïc, Sebastien, and Damien, Now that KIP-1033 is going to be released in 3.9, what is the plan to progress with this KIP? Is the KIP up-to-date, so that we can restart discussion? Best, Bruno This email was screened for spam and malicious content but exercise caution anyway. On 6/13/24 6:16 PM, Damien Gasparina wrote: > Hi Bruno, > > We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, > that's why not much progress has been made on this one yet. > Regarding your points: > > B1: It is up to the user to specify the DLQ topic name and to > implement a potential differentiation. I tend to think that having one > DLQ per application ID is the wisest, but I encountered cases and > applications that preferred having a shared DLQ topic between multiple > applications, e.g. to reduce the number of partitions, or to ease > monitoring > > B2 : Goot catch, it should be > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" > prefix during the discussion, looks like I forgot to update all > occurrences in the KIP. > > B3 :The trigger for sending to the DLQ would be if > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user > implemented a custom exception handler that returns DLQ records. > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the > behavior of the default handler, thus custom exception handlers will > completely ignore this parameter. > > I think it's a good trade-off between providing a production-ready > default implementation, yet providing sufficient flexibility for > complex use-cases. > This behavior definitely needs to be documented, but I guess it's safe > to push the responsibility of the DLQ records to the user if they > implement custom handlers. > > Cheers, > Damien > > > On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna wrote: >> >> Hi, >> >> since there was not too much activity in this thread recently, I was >> wondering what the status of this discussion is. >> >> I cannot find the examples in the KIP Sébastien mentioned in the last >> message to this thread. I can also not find the corresponding definition >> of the following method call in the KIP: >> >> FAIL.withDeadLetterQueueRecord(record, "dlq-topic") >> >> I have also some comments: >> >> B1 >> Did you consider to prefix the dead letter queue topic names with the >> application ID to distinguish the topics between Streams apps? Or is the >> user responsible for the differentiation? If the user is responsible, we >> risk that faulty records of different Streams apps end up in the same >> dead letter queue. >> >> B2 >> Is the name of the dead letter queue topic config >> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or >> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. >> >> B3 >> What is exactly the trigger to send a record to the dead letter queue? >> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a >> record to the return value of the exception handler? >> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do >> not add a record to the return value of the handler? What happens if I >> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to >> the return value of the handler? >> >> Best, >> Bruno >> >> On 4/22/24 10:19 PM, Sebastien Viale wrote: >>> Hi, >>> >>> Thanks for your remarks >>> >>> L1. I would say "who can do the most can do the least", even though most >>> people will fail and stop, we found it interesting to offer the possibility >>> to fail-and-send-to-DLQ >>> >>> L2: We did not consider extending the TimestampExtractor because we >>> estimate it out of scope for this KIP. Perhaps it will be possible to >>> include it in an ExceptionHandler later. >>> >>> L3: we will include an example in the KIP, but as we mentioned earlier, the >>> DLQ topic can be different in each custom Exception Handler: >>> >>> When providing custom handlers, users would have the possibility to return: &g
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Loïc, Sebastien, and Damien, Now that KIP-1033 is going to be released in 3.9, what is the plan to progress with this KIP? Is the KIP up-to-date, so that we can restart discussion? Best, Bruno On 6/13/24 6:16 PM, Damien Gasparina wrote: Hi Bruno, We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, that's why not much progress has been made on this one yet. Regarding your points: B1: It is up to the user to specify the DLQ topic name and to implement a potential differentiation. I tend to think that having one DLQ per application ID is the wisest, but I encountered cases and applications that preferred having a shared DLQ topic between multiple applications, e.g. to reduce the number of partitions, or to ease monitoring B2 : Goot catch, it should be ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" prefix during the discussion, looks like I forgot to update all occurrences in the KIP. B3 :The trigger for sending to the DLQ would be if ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user implemented a custom exception handler that returns DLQ records. ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the behavior of the default handler, thus custom exception handlers will completely ignore this parameter. I think it's a good trade-off between providing a production-ready default implementation, yet providing sufficient flexibility for complex use-cases. This behavior definitely needs to be documented, but I guess it's safe to push the responsibility of the DLQ records to the user if they implement custom handlers. Cheers, Damien On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna wrote: Hi, since there was not too much activity in this thread recently, I was wondering what the status of this discussion is. I cannot find the examples in the KIP Sébastien mentioned in the last message to this thread. I can also not find the corresponding definition of the following method call in the KIP: FAIL.withDeadLetterQueueRecord(record, "dlq-topic") I have also some comments: B1 Did you consider to prefix the dead letter queue topic names with the application ID to distinguish the topics between Streams apps? Or is the user responsible for the differentiation? If the user is responsible, we risk that faulty records of different Streams apps end up in the same dead letter queue. B2 Is the name of the dead letter queue topic config DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. B3 What is exactly the trigger to send a record to the dead letter queue? Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a record to the return value of the exception handler? What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do not add a record to the return value of the handler? What happens if I do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to the return value of the handler? Best, Bruno On 4/22/24 10:19 PM, Sebastien Viale wrote: Hi, Thanks for your remarks L1. I would say "who can do the most can do the least", even though most people will fail and stop, we found it interesting to offer the possibility to fail-and-send-to-DLQ L2: We did not consider extending the TimestampExtractor because we estimate it out of scope for this KIP. Perhaps it will be possible to include it in an ExceptionHandler later. L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ topic can be different in each custom Exception Handler: When providing custom handlers, users would have the possibility to return: * FAIL * CONTINUE * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") cheers ! Sébastien De : Lucas Brutschy Envoyé : lundi 22 avril 2024 14:36 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi! Thanks for the KIP, great stuff. L1. I was a bit confused that the default configuration (once you set a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood correctly. Is this something that will be a common use-case, and is it a configuration that we want to encourage? It expected that you either want to fail or skip-and-send-to-DLQ. L2. Have you considered extending the `TimestampExtractor` interface so that it can also produce to DLQ? AFAIK it's not covered by any of the existing exception handlers, but it can cause similar failures (potentially custom logic, depends on validity input record). There could also be a default implementation as a subclass of `ExtractRecordMetadataTimestamp`. L3. It would be nice to include an example of how to produce to m
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Bruno, We focused our effort (well, mostly Seb and Loic :)) on KIP-1033, that's why not much progress has been made on this one yet. Regarding your points: B1: It is up to the user to specify the DLQ topic name and to implement a potential differentiation. I tend to think that having one DLQ per application ID is the wisest, but I encountered cases and applications that preferred having a shared DLQ topic between multiple applications, e.g. to reduce the number of partitions, or to ease monitoring B2 : Goot catch, it should be ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT" prefix during the discussion, looks like I forgot to update all occurrences in the KIP. B3 :The trigger for sending to the DLQ would be if ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user implemented a custom exception handler that returns DLQ records. ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the behavior of the default handler, thus custom exception handlers will completely ignore this parameter. I think it's a good trade-off between providing a production-ready default implementation, yet providing sufficient flexibility for complex use-cases. This behavior definitely needs to be documented, but I guess it's safe to push the responsibility of the DLQ records to the user if they implement custom handlers. Cheers, Damien On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna wrote: > > Hi, > > since there was not too much activity in this thread recently, I was > wondering what the status of this discussion is. > > I cannot find the examples in the KIP Sébastien mentioned in the last > message to this thread. I can also not find the corresponding definition > of the following method call in the KIP: > > FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > > I have also some comments: > > B1 > Did you consider to prefix the dead letter queue topic names with the > application ID to distinguish the topics between Streams apps? Or is the > user responsible for the differentiation? If the user is responsible, we > risk that faulty records of different Streams apps end up in the same > dead letter queue. > > B2 > Is the name of the dead letter queue topic config > DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or > ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. > > B3 > What is exactly the trigger to send a record to the dead letter queue? > Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a > record to the return value of the exception handler? > What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do > not add a record to the return value of the handler? What happens if I > do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to > the return value of the handler? > > Best, > Bruno > > On 4/22/24 10:19 PM, Sebastien Viale wrote: > > Hi, > > > > Thanks for your remarks > > > > L1. I would say "who can do the most can do the least", even though most > > people will fail and stop, we found it interesting to offer the possibility > > to fail-and-send-to-DLQ > > > > L2: We did not consider extending the TimestampExtractor because we > > estimate it out of scope for this KIP. Perhaps it will be possible to > > include it in an ExceptionHandler later. > > > > L3: we will include an example in the KIP, but as we mentioned earlier, the > > DLQ topic can be different in each custom Exception Handler: > > > > When providing custom handlers, users would have the possibility to return: > > * FAIL > > * CONTINUE > > * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > > * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > > > > cheers ! > > Sébastien > > > > > > > > De : Lucas Brutschy > > Envoyé : lundi 22 avril 2024 14:36 > > À : dev@kafka.apache.org > > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > > > > Warning External sender Do not click on any links or open any attachments > > unless you trust the sender and know the content is safe. > > > > Hi! > > > > Thanks for the KIP, great stuff. > > > > L1. I was a bit confused that the default configuration (once you set > > a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood > > correctly. Is this something that will be a common use-case, and is it > > a configuration that we want to encourage? It expected that you either > > want to fail or skip-and-send-to-DLQ. > > > > L2. Have you considered extending the `TimestampExtractor` interfa
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi, since there was not too much activity in this thread recently, I was wondering what the status of this discussion is. I cannot find the examples in the KIP Sébastien mentioned in the last message to this thread. I can also not find the corresponding definition of the following method call in the KIP: FAIL.withDeadLetterQueueRecord(record, "dlq-topic") I have also some comments: B1 Did you consider to prefix the dead letter queue topic names with the application ID to distinguish the topics between Streams apps? Or is the user responsible for the differentiation? If the user is responsible, we risk that faulty records of different Streams apps end up in the same dead letter queue. B2 Is the name of the dead letter queue topic config DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used. B3 What is exactly the trigger to send a record to the dead letter queue? Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a record to the return value of the exception handler? What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do not add a record to the return value of the handler? What happens if I do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to the return value of the handler? Best, Bruno On 4/22/24 10:19 PM, Sebastien Viale wrote: Hi, Thanks for your remarks L1. I would say "who can do the most can do the least", even though most people will fail and stop, we found it interesting to offer the possibility to fail-and-send-to-DLQ L2: We did not consider extending the TimestampExtractor because we estimate it out of scope for this KIP. Perhaps it will be possible to include it in an ExceptionHandler later. L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ topic can be different in each custom Exception Handler: When providing custom handlers, users would have the possibility to return: * FAIL * CONTINUE * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") cheers ! Sébastien De : Lucas Brutschy Envoyé : lundi 22 avril 2024 14:36 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi! Thanks for the KIP, great stuff. L1. I was a bit confused that the default configuration (once you set a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood correctly. Is this something that will be a common use-case, and is it a configuration that we want to encourage? It expected that you either want to fail or skip-and-send-to-DLQ. L2. Have you considered extending the `TimestampExtractor` interface so that it can also produce to DLQ? AFAIK it's not covered by any of the existing exception handlers, but it can cause similar failures (potentially custom logic, depends on validity input record). There could also be a default implementation as a subclass of `ExtractRecordMetadataTimestamp`. L3. It would be nice to include an example of how to produce to multiple topics in the KIP, as I can imagine that this will be a common use-case. I wasn't sure how much code would be involved to make it work. If a lot of code is required, we may want to consider exposing some utils that make it easier. Cheers, Lucas This email was screened for spam and malicious content but exercise caution anyway. On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina wrote: Hi everyone, Following all the discussion on this KIP and KIP-1033, we introduced a new container class containing only processing context metadata: ProcessingMetadata. This new container class is actually part of KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I think it's the wisest implementation wise. I also clarified the interface of the enums: withDeadLetterQueueRecords(Iterable> deadLetterQueueRecords) . Very likely most users would just send one DLQ record, but there might be specific use-cases and what can do more can do less, so I added an Iterable. I took some time to think about the impact of storing the ProcessingMetadata on the ProductionExceptionHandler. I think storing the topic/offset/partition should be fine, but I am concerned about storing the rawSourceKey/Value. I think it could impact some specific use-cases, for example, a high-throughput Kafka Streams application "counting" messages could have huge source input messages, and very small sink messages, here, I assume storing the rawSourceKey/Value could significantly require more memory than the actual Kafka Producer buffer. I think the safest approach is actually to only store the fixed-size metadata for the ProductionExceptionHandler.handle:
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
efaultProductionExceptionHandler should be able to tell if records should > > be sent to DLQ or not. > > The "errors.deadletterqueue.topic.name" takes place to: > > > > * Specifying if the provided handlers should or should not send records > > to DLQ. > > * If the value is empty, the handlers should not send records to DLQ. > > * If the value is not empty, the handlers should send records to DLQ. > > * Define the name of the DLQ topic that should be used by the provided > > handlers. > > > > Thus, if "errors.deadletterqueue.topic.name" is defined, the provided > > handlers should return either: > > > > * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) > > * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). > > If "errors.deadletterqueue.topic.name" is defined but neither > > DeserializationExceptionHandler nor ProductionExceptionHandler classes are > > defined in the configuration, then nothing should happen as sending to DLQ > > is based on handlers’ response. > > When providing custom handlers, users would have the possibility to return: > > > > * FAIL > > * CONTINUE > > * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") > > * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") > > > > A DLQ topic name is currently required using the two last response types. > > I am wondering if it could benefit users to ease the use of the default DLQ > > topic "errors.deadletterqueue.topic.name" when implementing custom > > handlers, with such kind of implementation: > > > > * FAIL.withDefaultDeadLetterQueueRecord(record) > > * CONTINUE.withDefaultDeadLetterQueueRecord(record) > > > > Regards, > > Loïc > > > > De : Damien Gasparina > > Envoyé : dimanche 14 avril 2024 20:24 > > À : dev@kafka.apache.org > > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > > > > Warning External sender Do not click on any links or open any attachments > > unless you trust the sender and know the content is safe. > > > > Hi Sophie, > > > > Thanks a lot for your feedback and your detailed comments. > > > > S1. > > > I'm confused -- are you saying that we're introducing a new kind of > > ProducerRecord class for this? > > > > Sorry for the poor wording, that's not what I meant. While writing the > > KIP, I was hesitating between 1. leveraging the Kafka Producer > > ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in > > a separate parameter, 3. a new custom interface (e.g. > > DeadLetterQueueRecord). > > As the KafkaProducer ProducerRecord is not used in the Kafka Streams > > API (except ProductionExceptionHandler) and I would like to avoid a > > new interface if not strictly required, I leaned toward option 2. > > Thinking about it, maybe option 1. would be best, but I assume it > > could create confusion with KafkaStreams ProducerRecord. Let me sleep > > on it. > > > > S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and > > your point in S4, it seems more and more likely that we will create a > > new container class containing only the metadata for the exception > > handlers. To be consistent, I think we should use this new > > implementation in all exception handlers. > > The only issue I could think off is that the new interface would > > expose less data than the current ProcessorContext in the > > DeserializationException(e.g. stateDir(), metrics(), getStateStore()), > > thus it could be hard for some users to migrate to the new interface. > > I do expect that only a few users would be impacted as the javadoc is > > very clear: `Note, that the passed in {@link ProcessorContext} only > > allows access to metadata like the task ID.` > > > > S3. I completely agree with you, it is something that might not be > > trivial and should be thoroughly covered by unit tests during the > > implementation. > > > > S4. Good point, I did not notice that the ProductionExceptionHandler > > is also invoked in the producer.send() callback. > > Capturing the ProcessingContext for each in-flight message is probably > > not possible. I think there is no other way to write a custom > > container class holding only the metadata that are essentials, I am > > thinking of storing the following attributes: source topic, partition, > > offset, rawKey, rawValue and taskId. > > Those metadata should
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
e default DLQ > topic "errors.deadletterqueue.topic.name" when implementing custom handlers, > with such kind of implementation: > > * FAIL.withDefaultDeadLetterQueueRecord(record) > * CONTINUE.withDefaultDeadLetterQueueRecord(record) > > Regards, > Loïc > > De : Damien Gasparina > Envoyé : dimanche 14 avril 2024 20:24 > À : dev@kafka.apache.org > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams > > Warning External sender Do not click on any links or open any attachments > unless you trust the sender and know the content is safe. > > Hi Sophie, > > Thanks a lot for your feedback and your detailed comments. > > S1. > > I'm confused -- are you saying that we're introducing a new kind of > ProducerRecord class for this? > > Sorry for the poor wording, that's not what I meant. While writing the > KIP, I was hesitating between 1. leveraging the Kafka Producer > ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in > a separate parameter, 3. a new custom interface (e.g. > DeadLetterQueueRecord). > As the KafkaProducer ProducerRecord is not used in the Kafka Streams > API (except ProductionExceptionHandler) and I would like to avoid a > new interface if not strictly required, I leaned toward option 2. > Thinking about it, maybe option 1. would be best, but I assume it > could create confusion with KafkaStreams ProducerRecord. Let me sleep > on it. > > S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and > your point in S4, it seems more and more likely that we will create a > new container class containing only the metadata for the exception > handlers. To be consistent, I think we should use this new > implementation in all exception handlers. > The only issue I could think off is that the new interface would > expose less data than the current ProcessorContext in the > DeserializationException(e.g. stateDir(), metrics(), getStateStore()), > thus it could be hard for some users to migrate to the new interface. > I do expect that only a few users would be impacted as the javadoc is > very clear: `Note, that the passed in {@link ProcessorContext} only > allows access to metadata like the task ID.` > > S3. I completely agree with you, it is something that might not be > trivial and should be thoroughly covered by unit tests during the > implementation. > > S4. Good point, I did not notice that the ProductionExceptionHandler > is also invoked in the producer.send() callback. > Capturing the ProcessingContext for each in-flight message is probably > not possible. I think there is no other way to write a custom > container class holding only the metadata that are essentials, I am > thinking of storing the following attributes: source topic, partition, > offset, rawKey, rawValue and taskId. > Those metadata should be relatively small, but I assume that there > could be a high number of in-flight messages, especially with at least > once processing guarantee. Do you think it would be fine memory wise? > > S5. As many exceptions are only accessible in exception handlers, and > we wanted to 1) allow users to customize the DLQ records and 2) have a > suitable DLQ out of the box implementation, we felt it natural to rely > on exception handlers, that's also why we created KIP-1033. > Piggybacking on the enum response was the cleanest way we could think > off, but we are completely open to suggestions. > > S5a. Completely agree with you on this point, for this DLQ approach to > be complete, the ProcessingExceptionHandler introduced in KIP-1033 is > required. KIP-1033 is definitely our first priority. We decided to > kick-off the KIP-1034 discussion as we expected the discussions to be > dynamic and could potentially impact some choices of KIP-1033. > > S5b. In this KIP, we wanted to 1. provide as much flexibility to the > user as possible; 2. provide a good default implementation > for the DLQ without having to write custom exception handlers. > For the default implementation, we introduced a new configuration: > errors.deadletterqueue.topic.name. > > If this configuration is set, it changes the behavior of the provided > exception handlers to return a DLQ record containing the raw key/value > + headers + exception metadata in headers. > If the out of the box implementation is not suitable for a user, e.g. > the payload needs to be masked in the DLQ, it could implement their > own exception handlers. The errors.deadletterqueue.topic.name would > only impact Kafka Streams bundled exception handlers (e.g. > org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) > > Let me update the KIP to make it clear and also provide examples. >
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Sophie, Thanks for your feedback. Completing the Damien's comments here for points S1 and S5B. S1: > I'm confused -- are you saying that we're introducing a new kind of > ProducerRecord class for this? I am wondering if it makes sense to alter the ProducerRecord from Clients API with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams DLQ. Adding "deadLetterQueueTopicName" as an additional parameter to "withDeadLetterQueueRecord" is a good option, and may allow users to send records to different DLQ topics depending on conditions: @Override public ProductionExceptionHandlerResponse handle(final ProcessingContext context, ProducerRecord record, Exception exception) { if (condition1) { return ProductionExceptionHandlerResponse.CONTINUE .withDeadLetterQueueRecord(record, "dlq-topic-a"); } if (condition2) { return ProductionExceptionHandlerResponse.CONTINUE .withDeadLetterQueueRecord(record, "dlq-topic-b"); } return ProductionExceptionHandlerResponse.CONTINUE .withDeadLetterQueueRecord(record, "dlq-topic-c"); } S5B: > I was having a bit of trouble understanding what the behavior would be if > someone configured a "errors.deadletterqueue.topic.name" but didn't implement > the handlers. The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and DefaultProductionExceptionHandler should be able to tell if records should be sent to DLQ or not. The "errors.deadletterqueue.topic.name" takes place to: * Specifying if the provided handlers should or should not send records to DLQ. * If the value is empty, the handlers should not send records to DLQ. * If the value is not empty, the handlers should send records to DLQ. * Define the name of the DLQ topic that should be used by the provided handlers. Thus, if "errors.deadletterqueue.topic.name" is defined, the provided handlers should return either: * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue) * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue). If "errors.deadletterqueue.topic.name" is defined but neither DeserializationExceptionHandler nor ProductionExceptionHandler classes are defined in the configuration, then nothing should happen as sending to DLQ is based on handlers’ response. When providing custom handlers, users would have the possibility to return: * FAIL * CONTINUE * FAIL.withDeadLetterQueueRecord(record, "dlq-topic") * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic") A DLQ topic name is currently required using the two last response types. I am wondering if it could benefit users to ease the use of the default DLQ topic "errors.deadletterqueue.topic.name" when implementing custom handlers, with such kind of implementation: * FAIL.withDefaultDeadLetterQueueRecord(record) * CONTINUE.withDefaultDeadLetterQueueRecord(record) Regards, Loïc De : Damien Gasparina Envoyé : dimanche 14 avril 2024 20:24 À : dev@kafka.apache.org Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams Warning External sender Do not click on any links or open any attachments unless you trust the sender and know the content is safe. Hi Sophie, Thanks a lot for your feedback and your detailed comments. S1. > I'm confused -- are you saying that we're introducing a new kind of ProducerRecord class for this? Sorry for the poor wording, that's not what I meant. While writing the KIP, I was hesitating between 1. leveraging the Kafka Producer ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in a separate parameter, 3. a new custom interface (e.g. DeadLetterQueueRecord). As the KafkaProducer ProducerRecord is not used in the Kafka Streams API (except ProductionExceptionHandler) and I would like to avoid a new interface if not strictly required, I leaned toward option 2. Thinking about it, maybe option 1. would be best, but I assume it could create confusion with KafkaStreams ProducerRecord. Let me sleep on it. S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and your point in S4, it seems more and more likely that we will create a new container class containing only the metadata for the exception handlers. To be consistent, I think we should use this new implementation in all exception handlers. The only issue I could think off is that the new interface would expose less data than the current ProcessorContext in the DeserializationException(e.g. stateDir(), metrics(), getStateStore()), thus it could be hard for some users to migrate to the new interface. I do expect that only a few users would be impacted as the javadoc is very
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Sophie, Thanks a lot for your feedback and your detailed comments. S1. > I'm confused -- are you saying that we're introducing a new kind of ProducerRecord class for this? Sorry for the poor wording, that's not what I meant. While writing the KIP, I was hesitating between 1. leveraging the Kafka Producer ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in a separate parameter, 3. a new custom interface (e.g. DeadLetterQueueRecord). As the KafkaProducer ProducerRecord is not used in the Kafka Streams API (except ProductionExceptionHandler) and I would like to avoid a new interface if not strictly required, I leaned toward option 2. Thinking about it, maybe option 1. would be best, but I assume it could create confusion with KafkaStreams ProducerRecord. Let me sleep on it. S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and your point in S4, it seems more and more likely that we will create a new container class containing only the metadata for the exception handlers. To be consistent, I think we should use this new implementation in all exception handlers. The only issue I could think off is that the new interface would expose less data than the current ProcessorContext in the DeserializationException(e.g. stateDir(), metrics(), getStateStore()), thus it could be hard for some users to migrate to the new interface. I do expect that only a few users would be impacted as the javadoc is very clear: `Note, that the passed in {@link ProcessorContext} only allows access to metadata like the task ID.` S3. I completely agree with you, it is something that might not be trivial and should be thoroughly covered by unit tests during the implementation. S4. Good point, I did not notice that the ProductionExceptionHandler is also invoked in the producer.send() callback. Capturing the ProcessingContext for each in-flight message is probably not possible. I think there is no other way to write a custom container class holding only the metadata that are essentials, I am thinking of storing the following attributes: source topic, partition, offset, rawKey, rawValue and taskId. Those metadata should be relatively small, but I assume that there could be a high number of in-flight messages, especially with at least once processing guarantee. Do you think it would be fine memory wise? S5. As many exceptions are only accessible in exception handlers, and we wanted to 1) allow users to customize the DLQ records and 2) have a suitable DLQ out of the box implementation, we felt it natural to rely on exception handlers, that's also why we created KIP-1033. Piggybacking on the enum response was the cleanest way we could think off, but we are completely open to suggestions. S5a. Completely agree with you on this point, for this DLQ approach to be complete, the ProcessingExceptionHandler introduced in KIP-1033 is required. KIP-1033 is definitely our first priority. We decided to kick-off the KIP-1034 discussion as we expected the discussions to be dynamic and could potentially impact some choices of KIP-1033. S5b. In this KIP, we wanted to 1. provide as much flexibility to the user as possible; 2. provide a good default implementation for the DLQ without having to write custom exception handlers. For the default implementation, we introduced a new configuration: errors.deadletterqueue.topic.name. If this configuration is set, it changes the behavior of the provided exception handlers to return a DLQ record containing the raw key/value + headers + exception metadata in headers. If the out of the box implementation is not suitable for a user, e.g. the payload needs to be masked in the DLQ, it could implement their own exception handlers. The errors.deadletterqueue.topic.name would only impact Kafka Streams bundled exception handlers (e.g. org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler) Let me update the KIP to make it clear and also provide examples. S6/S7. Good point, mea culpa for the camel case, it must have been a sugar rush :) Thanks again for your detailed comments and pointing out S4 (production exception & Processing Context)! Cheers, Damien On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman wrote: > > Thanks for the KIP, this will make a lot of people very happy. > > Wanted to chime in on a few points that have been raised so far and add > some of my own (numbering with an S to distinguish my points from the > previous ones) > > S1. > > > 1.a I really meant ProducerRecord, that's the class used to forward to > > downstream processors in the PAPI. The only information missing in > > this class is the topic name. I also considered relying on the Kafka > > Producer ProducerRecord, but I assume it would not be consistent with > > the KafkaStreams API. > > I'm confused -- are you saying that we're introducing a new kind of > ProducerRecord class for this? Why not just use the existing one, ie the > o.a.k.clients.producer.ProducerRecord class? This is what the > Prod
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Thanks for the KIP, this will make a lot of people very happy. Wanted to chime in on a few points that have been raised so far and add some of my own (numbering with an S to distinguish my points from the previous ones) S1. > 1.a I really meant ProducerRecord, that's the class used to forward to > downstream processors in the PAPI. The only information missing in > this class is the topic name. I also considered relying on the Kafka > Producer ProducerRecord, but I assume it would not be consistent with > the KafkaStreams API. I'm confused -- are you saying that we're introducing a new kind of ProducerRecord class for this? Why not just use the existing one, ie the o.a.k.clients.producer.ProducerRecord class? This is what the ProductionExceptionHandler uses, so it's definitely "consistent". In other words, we can remove the "String deadLetterQueueTopicName" S2. I think this would be a good opportunity to also deprecate the existing #handle method of the DeserializationExceptionHandler, and replace it with one that uses a ProcessingContext instead of the ProcessorContext. Partly for the same reasons about guarding access to the #forward methods, partly because this method needs to be migrated to the new PAPI interface anyways, and ProcessingContext is part of the new one. S3. Regarding 2a. -- I'm inclined to agree that records which a Punctuator failed to produce should also be sent to the DLQ via the ProductionExceptionHandler. Users will just need to be careful about accessing certain fields of the ProcessingContext that aren't available in the punctuator, and need to check the Optional returned by the ProcessingContext#recordMetadata API. Also, from an implementation standpoint, it will be really hard to distinguish between a record created by a punctuator vs a processor from within the RecordCollector, which is the class that actually handles sending records to the Streams Producer and invoking the ProductionExceptionHandler. This is because the RecordCollector is at the "end" of the topology graph and doesn't have any context about which of the upstream processors actually attempted to forward a record. This in itself is at least theoretically solvable, but it leads into my first major new point: S4: I'm deeply worried about passing the ProcessingContext in as a means of forwarding metadata. The problem is that the processing/processor context is a mutable class and is inherently meaningless outside the context of a specific task. And when I said earlier that the RecordCollector sits at the "end" of the topology, I meant that it's literally outside the task's subtopology and is used/shared by all tasks on that StreamThread. So to begin with, there's no guarantee what will actually be returned for essential methods such as the new #rawSourceKey/Value or the existing #recordMetadata For serialization exceptions it'll probably be correct, but for general send errors it almost definitely won't be. In short, this is because we send records to the producer after the sink node, but don't check for send errors right away since obviously it takes some time for the producer to actually send. In other words, sending/producing records is actually done asynchronously with processing, and we simply check for errors on any previously-sent records during the send on a new record in a sink node. This means the context we would be passing in to a (non-serialization) exception would pretty much always correspond not the the record that experienced the error, but the random record that happened to be being sent when we checked and saw the error for the failed record. This discrepancy, in addition to the whole "sourceRawKey/Value and recordMetadata are null for punctuators" issue, seems like an insurmountable inconsistency that is more likely to cause users confusion or problems than be helpful. We could create a new metadata object and copy over the relevant info from the ProcessingContext, but I worry that has the potential to explode memory since we'd need to hold on to it for all in-flight records up until they are either successfully sent or failed and passed in to the ProductionExceptionHandler. But if the metadata is relatively small, it's probably fine. Especially if it's just the raw source key/value. Are there any other parts of the ProcessingContext you think should be made available? Note that this only applies to the ProductionExceptionHandler, as the DeserializationExceptionHandler (and the newly proposed ProcessingExceptionHandler) would both be invoked immediately and therefore with the failed record's context. However, I'm also a bit uncomfortable with adding the rawSourceKey/rawSourceValue to the ProcessingContext. So I'd propose to just wrap those (and any other metadata you might want) in a container class and pass that in instead of the ProcessingContext, to all of the exception handlers. S5: For some reason I'm finding the proposed API a little bit awkward, although it's entirely
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Andrew, Thanks a lot for your review, plenty of good points! 11. Typo fixed, good cach. 12. I do agree with you and Nick also mentioned it, I updated the KIP to mention that context headers should be forwarded. 13. Good catch, to be consistent with KIP-298, and without a strong opinion from my side, I updated the KIP with your prefix proposal. 14. I am not sure about this point, a big difference between KIP-298 and this one is that the handlers can easily be overridden, something that is not doable in Kafka Connect. If someone would like a different behavior, e.g. to mask the payload or include further headers, I think we should encourage them to write their own exception handlers to build the DLQ Record the way they expect. 15. Yeah, that's a good point, I was not fully convinced about putting a String in it, I do assume that "null" is also a valid value. I do assume that the Stacktrace and the Exception in this case are the key metadata for the user to troubleshoot the problem. I updated the KIP to mention that the value should be null if triggered in a punctuate. 16. I added a session to mention that Kafka Streams would not try to automatically create the topic and the topic should either be automatically created, or pre-created. 17. If a DLQ record can not be sent, the exception should go to the uncaughtExceptionHandler. Let me clearly state it in the KIP. On Fri, 12 Apr 2024 at 17:25, Damien Gasparina wrote: > > Hi Nick, > > 1. Good point, that's less impactful than a custom interface, I just > updated the KIP with the new signature. > > 1.a I really meant ProducerRecord, that's the class used to forward to > downstream processors in the PAPI. The only information missing in > this class is the topic name. I also considered relying on the Kafka > Producer ProducerRecord, but I assume it would not be consistent with > the KafkaStreams API. > > 2. Agreed > > 2.a I do think exceptions occurring during punctuate should be > included in the DLQ. > Even if building a suitable payload is almost impossible, even with > custom code; those exceptions are still fatal for Kafka Streams by > default and are something that can not be ignored safely. > I do assume that most users would want to be informed if an error > happened during a punctuate, even if only the metadata (e.g. > stacktrace, exception) is provided. > I am only concerned flooding the DLQ topic as, if a scheduled > operation failed, very likely it will fails during the next > invocation, but > > 4. Good point, I clarified the wording in the KIP to make it explicit. > > 5. Good point, I will clearly mention that it is out of scope as part > of the KIP and might not be as trivial as people could expect. I will > update the KIP once I do have some spare time. > > 6. Oh yeah, I didn't think about it, but forwarding input headers > would definitely make sense. Confluent Schema Registry ID is actually > part of the payload, but many correlation ID and technical metadata > are passed through headers, it makes sense to forward them, specially > as it is the default behavior of Kafka Streams, > > > > On Fri, 12 Apr 2024 at 15:25, Nick Telford wrote: > > > > Hi Damien and Sebastien, > > > > 1. > > I think you can just add a `String topic` argument to the existing > > `withDeadLetterQueueRecord(ProducerRecord > > deadLetterQueueRecord)` method, and then the implementation of the > > exception handler could choose the topic to send records to using whatever > > logic the user desires. You could perhaps provide a built-in implementation > > that leverages your new config to send all records to an untyped DLQ topic? > > > > 1a. > > BTW you have a typo: in your DeserializationExceptionHandler, the type of > > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should > > probably be `ConsumerRecord`. > > > > 2. > > Agreed. I think it's a good idea to provide an implementation that sends to > > a single DLQ by default, but it's important to enable users to customize > > this with their own exception handlers. > > > > 2a. > > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a > > DLQ topic like it's a bad record. To me, a DLQ should only contain records > > that failed to process. I'm not even sure how a user would > > re-process/action one of these other errors; it seems like the purview of > > error logging to me? > > > > 4. > > My point here was that I think it would be useful for the KIP to contain an > > explanation of the behavior both with KIP-1033 and without it. i.e. clarify > > if/how records that throw an exception in a processor are handled. At the > > moment, I'm assuming that without KIP-1033, processing exceptions would not > > cause records to be sent to the DLQ, but with KIP-1033, they would. If this > > assumption is correct, I think it should be made explicit in the KIP. > > > > 5. > > Understood. You may want to make this explicit in the documentation for > > users, so they understand the conse
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Nick, 1. Good point, that's less impactful than a custom interface, I just updated the KIP with the new signature. 1.a I really meant ProducerRecord, that's the class used to forward to downstream processors in the PAPI. The only information missing in this class is the topic name. I also considered relying on the Kafka Producer ProducerRecord, but I assume it would not be consistent with the KafkaStreams API. 2. Agreed 2.a I do think exceptions occurring during punctuate should be included in the DLQ. Even if building a suitable payload is almost impossible, even with custom code; those exceptions are still fatal for Kafka Streams by default and are something that can not be ignored safely. I do assume that most users would want to be informed if an error happened during a punctuate, even if only the metadata (e.g. stacktrace, exception) is provided. I am only concerned flooding the DLQ topic as, if a scheduled operation failed, very likely it will fails during the next invocation, but 4. Good point, I clarified the wording in the KIP to make it explicit. 5. Good point, I will clearly mention that it is out of scope as part of the KIP and might not be as trivial as people could expect. I will update the KIP once I do have some spare time. 6. Oh yeah, I didn't think about it, but forwarding input headers would definitely make sense. Confluent Schema Registry ID is actually part of the payload, but many correlation ID and technical metadata are passed through headers, it makes sense to forward them, specially as it is the default behavior of Kafka Streams, On Fri, 12 Apr 2024 at 15:25, Nick Telford wrote: > > Hi Damien and Sebastien, > > 1. > I think you can just add a `String topic` argument to the existing > `withDeadLetterQueueRecord(ProducerRecord > deadLetterQueueRecord)` method, and then the implementation of the > exception handler could choose the topic to send records to using whatever > logic the user desires. You could perhaps provide a built-in implementation > that leverages your new config to send all records to an untyped DLQ topic? > > 1a. > BTW you have a typo: in your DeserializationExceptionHandler, the type of > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should > probably be `ConsumerRecord`. > > 2. > Agreed. I think it's a good idea to provide an implementation that sends to > a single DLQ by default, but it's important to enable users to customize > this with their own exception handlers. > > 2a. > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a > DLQ topic like it's a bad record. To me, a DLQ should only contain records > that failed to process. I'm not even sure how a user would > re-process/action one of these other errors; it seems like the purview of > error logging to me? > > 4. > My point here was that I think it would be useful for the KIP to contain an > explanation of the behavior both with KIP-1033 and without it. i.e. clarify > if/how records that throw an exception in a processor are handled. At the > moment, I'm assuming that without KIP-1033, processing exceptions would not > cause records to be sent to the DLQ, but with KIP-1033, they would. If this > assumption is correct, I think it should be made explicit in the KIP. > > 5. > Understood. You may want to make this explicit in the documentation for > users, so they understand the consequences of re-processing data sent to > their DLQ. The main reason I raised this point is it's something that's > tripped me up in numerous KIPs that that committers frequently remind me > of; so I wanted to get ahead of it for once! :D > > And one new point: > 6. > The DLQ record schema appears to discard all custom headers set on the > source record. Is there a way these can be included? In particular, I'm > concerned with "schema pointer" headers (like those set by Schema > Registry), that may need to be propagated, especially if the records are > fed back into the source topics for re-processing by the user. > > Regards, > Nick > > > On Fri, 12 Apr 2024 at 13:20, Damien Gasparina > wrote: > > > Hi Nick, > > > > Thanks a lot for your review and your useful comments! > > > > 1. It is a good point, as you mentioned, I think it would make sense > > in some use cases to have potentially multiple DLQ topics, so we > > should provide an API to let users do it. > > Thinking out-loud here, maybe it is a better approach to create a new > > Record class containing the topic name, e.g. DeadLetterQueueRecord and > > changing the signature to > > withDeadLetterQueueRecords(Iteratable > > deadLetterQueueRecords) instead of > > withDeadLetterQueueRecord(ProducerRecord > > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would > > be something like "class DeadLetterQueueRecord extends > > org.apache.kafka.streams.processor.api;.ProducerRecords { String > > topic; /* + getter/setter + */ } " > > > > 2. I think the root question here is: should we have one DLQ topic or >
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Damien, Sebastien and Loic, Thanks for the KIP. The DLQ pattern is well established and bringing this to Kafka Streams is a good improvement. I do plan to add DLQ support to share groups in the future, once KIP-932 is complete. Having broad support in Kafka for DLQs is great. I have a few comments. 11. Tiny typo “Deal letter queue”. 12. Copying across the raw key and value, then adding headers is a good model and is shared with DLQs in Kafka Connect. It is important that pre-existing headers are also propagated, not just the key and value. 13. In terms of naming headers, I suggest that you follow KIP-298. It uses a common prefix for all added headers which is unlikely to clash with pre-existing headers. For example, you could use: __streams.errors.topic __streams.errors.partition __streams.errors.offset __streams.errors.exception.class.name __streams.errors.exception.stacktrace __streams errors.exception.message 14. Also in common with KIP-298, I think you should be able to opt out of the additional headers and just copy the raw record onto the DLQ topic. 15. I would not make up the record value for situations in which you do not have a record value. You can use an exception message header if an exception is the reason why there’s no record value. You could use an additional header for the punctuate case if required, or perhaps even create an exception class specifically for this case. 16. What happens if the DLQ topic does not exist? If topic auto-create is enabled, the broker would automatically create it with default options. The KIP probably ought to say what happens when the topic doesn’t exist. 17. What happens if the record cannot be put onto the DLQ? I suppose KafkaStreams stops. Thanks, Andrew > On 12 Apr 2024, at 14:24, Nick Telford wrote: > > Hi Damien and Sebastien, > > 1. > I think you can just add a `String topic` argument to the existing > `withDeadLetterQueueRecord(ProducerRecord > deadLetterQueueRecord)` method, and then the implementation of the > exception handler could choose the topic to send records to using whatever > logic the user desires. You could perhaps provide a built-in implementation > that leverages your new config to send all records to an untyped DLQ topic? > > 1a. > BTW you have a typo: in your DeserializationExceptionHandler, the type of > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should > probably be `ConsumerRecord`. > > 2. > Agreed. I think it's a good idea to provide an implementation that sends to > a single DLQ by default, but it's important to enable users to customize > this with their own exception handlers. > > 2a. > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a > DLQ topic like it's a bad record. To me, a DLQ should only contain records > that failed to process. I'm not even sure how a user would > re-process/action one of these other errors; it seems like the purview of > error logging to me? > > 4. > My point here was that I think it would be useful for the KIP to contain an > explanation of the behavior both with KIP-1033 and without it. i.e. clarify > if/how records that throw an exception in a processor are handled. At the > moment, I'm assuming that without KIP-1033, processing exceptions would not > cause records to be sent to the DLQ, but with KIP-1033, they would. If this > assumption is correct, I think it should be made explicit in the KIP. > > 5. > Understood. You may want to make this explicit in the documentation for > users, so they understand the consequences of re-processing data sent to > their DLQ. The main reason I raised this point is it's something that's > tripped me up in numerous KIPs that that committers frequently remind me > of; so I wanted to get ahead of it for once! :D > > And one new point: > 6. > The DLQ record schema appears to discard all custom headers set on the > source record. Is there a way these can be included? In particular, I'm > concerned with "schema pointer" headers (like those set by Schema > Registry), that may need to be propagated, especially if the records are > fed back into the source topics for re-processing by the user. > > Regards, > Nick > > > On Fri, 12 Apr 2024 at 13:20, Damien Gasparina > wrote: > >> Hi Nick, >> >> Thanks a lot for your review and your useful comments! >> >> 1. It is a good point, as you mentioned, I think it would make sense >> in some use cases to have potentially multiple DLQ topics, so we >> should provide an API to let users do it. >> Thinking out-loud here, maybe it is a better approach to create a new >> Record class containing the topic name, e.g. DeadLetterQueueRecord and >> changing the signature to >> withDeadLetterQueueRecords(Iteratable >> deadLetterQueueRecords) instead of >> withDeadLetterQueueRecord(ProducerRecord >> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would >> be something like "class DeadLetterQueueRecord extends >> org.apache.kafka.streams.processor.api;.Produc
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Damien and Sebastien, 1. I think you can just add a `String topic` argument to the existing `withDeadLetterQueueRecord(ProducerRecord deadLetterQueueRecord)` method, and then the implementation of the exception handler could choose the topic to send records to using whatever logic the user desires. You could perhaps provide a built-in implementation that leverages your new config to send all records to an untyped DLQ topic? 1a. BTW you have a typo: in your DeserializationExceptionHandler, the type of your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should probably be `ConsumerRecord`. 2. Agreed. I think it's a good idea to provide an implementation that sends to a single DLQ by default, but it's important to enable users to customize this with their own exception handlers. 2a. I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a DLQ topic like it's a bad record. To me, a DLQ should only contain records that failed to process. I'm not even sure how a user would re-process/action one of these other errors; it seems like the purview of error logging to me? 4. My point here was that I think it would be useful for the KIP to contain an explanation of the behavior both with KIP-1033 and without it. i.e. clarify if/how records that throw an exception in a processor are handled. At the moment, I'm assuming that without KIP-1033, processing exceptions would not cause records to be sent to the DLQ, but with KIP-1033, they would. If this assumption is correct, I think it should be made explicit in the KIP. 5. Understood. You may want to make this explicit in the documentation for users, so they understand the consequences of re-processing data sent to their DLQ. The main reason I raised this point is it's something that's tripped me up in numerous KIPs that that committers frequently remind me of; so I wanted to get ahead of it for once! :D And one new point: 6. The DLQ record schema appears to discard all custom headers set on the source record. Is there a way these can be included? In particular, I'm concerned with "schema pointer" headers (like those set by Schema Registry), that may need to be propagated, especially if the records are fed back into the source topics for re-processing by the user. Regards, Nick On Fri, 12 Apr 2024 at 13:20, Damien Gasparina wrote: > Hi Nick, > > Thanks a lot for your review and your useful comments! > > 1. It is a good point, as you mentioned, I think it would make sense > in some use cases to have potentially multiple DLQ topics, so we > should provide an API to let users do it. > Thinking out-loud here, maybe it is a better approach to create a new > Record class containing the topic name, e.g. DeadLetterQueueRecord and > changing the signature to > withDeadLetterQueueRecords(Iteratable > deadLetterQueueRecords) instead of > withDeadLetterQueueRecord(ProducerRecord > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would > be something like "class DeadLetterQueueRecord extends > org.apache.kafka.streams.processor.api;.ProducerRecords { String > topic; /* + getter/setter + */ } " > > 2. I think the root question here is: should we have one DLQ topic or > multiple DLQ topics by default. This question highly depends on the > context, but implementing a default implementation to handle multiple > DLQ topics would be opinionated, e.g. how to manage errors in a > punctuate? > I think it makes sense to have the default implementation writing all > faulty records to a single DLQ, that's at least the approach I used in > past applications: one DLQ per Kafka Streams application. Of course > the message format could change in the DLQ e.g. due to the source > topic, but those DLQ records will be very likely troubleshooted, and > maybe replay, manually anyway. > If a user needs to have multiple DLQ topics or want to enforce a > specific schema, it's still possible, but they would need to implement > custom Exception Handlers. > Coming back to 1. I do agree that it would make sense to have the user > set the DLQ topic name in the handlers for more flexibility. > > 3. Good point, sorry it was a typo, the ProcessingContext makes much > more sense here indeed. > > 4. I do assume that we could implement KIP-1033 (Processing exception > handler) independently from KIP-1034. I do hope that KIP-1033 would be > adopted and implemented before KIP-1034, but if that's not the case, > we could implement KIP-1034 indepantly and update KIP-1033 to include > the DLQ record afterward (in the same KIP or in a new one if not > possible). > > 5. I think we should be clear that this KIP only covers the DLQ record > produced. > Everything related to replay messages or recovery plan should be > considered out-of-scope as it is use-case and error specific. > > Let me know if that's not clear, there are definitely points that > highly debatable. > > Cheers, > Damien > > On Fri, 12 Apr 2024 at 13:00, Nick Telford wrote: > > > > Oh, and one more
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Nick, Thanks a lot for your review and your useful comments! 1. It is a good point, as you mentioned, I think it would make sense in some use cases to have potentially multiple DLQ topics, so we should provide an API to let users do it. Thinking out-loud here, maybe it is a better approach to create a new Record class containing the topic name, e.g. DeadLetterQueueRecord and changing the signature to withDeadLetterQueueRecords(Iteratable deadLetterQueueRecords) instead of withDeadLetterQueueRecord(ProducerRecord deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would be something like "class DeadLetterQueueRecord extends org.apache.kafka.streams.processor.api;.ProducerRecords { String topic; /* + getter/setter + */ } " 2. I think the root question here is: should we have one DLQ topic or multiple DLQ topics by default. This question highly depends on the context, but implementing a default implementation to handle multiple DLQ topics would be opinionated, e.g. how to manage errors in a punctuate? I think it makes sense to have the default implementation writing all faulty records to a single DLQ, that's at least the approach I used in past applications: one DLQ per Kafka Streams application. Of course the message format could change in the DLQ e.g. due to the source topic, but those DLQ records will be very likely troubleshooted, and maybe replay, manually anyway. If a user needs to have multiple DLQ topics or want to enforce a specific schema, it's still possible, but they would need to implement custom Exception Handlers. Coming back to 1. I do agree that it would make sense to have the user set the DLQ topic name in the handlers for more flexibility. 3. Good point, sorry it was a typo, the ProcessingContext makes much more sense here indeed. 4. I do assume that we could implement KIP-1033 (Processing exception handler) independently from KIP-1034. I do hope that KIP-1033 would be adopted and implemented before KIP-1034, but if that's not the case, we could implement KIP-1034 indepantly and update KIP-1033 to include the DLQ record afterward (in the same KIP or in a new one if not possible). 5. I think we should be clear that this KIP only covers the DLQ record produced. Everything related to replay messages or recovery plan should be considered out-of-scope as it is use-case and error specific. Let me know if that's not clear, there are definitely points that highly debatable. Cheers, Damien On Fri, 12 Apr 2024 at 13:00, Nick Telford wrote: > > Oh, and one more thing: > > 5. > Whenever you take a record out of the stream, and then potentially > re-introduce it at a later date, you introduce the potential for record > ordering issues. For example, that record could have been destined for a > Window that has been closed by the time it's re-processed. I'd like to see > a section that considers these consequences, and perhaps make those risks > clear to users. For the record, this is exactly what sunk KIP-990, which > was an alternative approach to error handling that introduced the same > issues. > > Cheers, > > Nick > > On Fri, 12 Apr 2024 at 11:54, Nick Telford wrote: > > > Hi Damien, > > > > Thanks for the KIP! Dead-letter queues are something that I think a lot of > > users would like. > > > > I think there are a few points with this KIP that concern me: > > > > 1. > > It looks like you can only define a single, global DLQ for the entire > > Kafka Streams application? What about applications that would like to > > define different DLQs for different data flows? This is especially > > important when dealing with multiple source topics that have different > > record schemas. > > > > 2. > > Your DLQ payload value can either be the record value that failed, or an > > error string (such as "error during punctuate"). This is likely to cause > > problems when users try to process the records from the DLQ, as they can't > > guarantee the format of every record value will be the same. This is very > > loosely related to point 1. above. > > > > 3. > > You provide a ProcessorContext to both exception handlers, but state they > > cannot be used to forward records. In that case, I believe you should use > > ProcessingContext instead, which statically guarantees that it can't be > > used to forward records. > > > > 4. > > You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan > > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > > > > Regards, > > > > Nick > > > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina > > wrote: > > > >> In a general way, if the user does not configure the right ACL, that > >> would be a security issue, but that's true for any topic. > >> > >> This KIP allows users to configure a Dead Letter Queue without writing > >> custom Java code in Kafka Streams, not at the topic level. > >> A lot of applications are already implementing this pattern, but the > >> required code to do it is quite painful and error prone, for example > >> mo
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Oh, and one more thing: 5. Whenever you take a record out of the stream, and then potentially re-introduce it at a later date, you introduce the potential for record ordering issues. For example, that record could have been destined for a Window that has been closed by the time it's re-processed. I'd like to see a section that considers these consequences, and perhaps make those risks clear to users. For the record, this is exactly what sunk KIP-990, which was an alternative approach to error handling that introduced the same issues. Cheers, Nick On Fri, 12 Apr 2024 at 11:54, Nick Telford wrote: > Hi Damien, > > Thanks for the KIP! Dead-letter queues are something that I think a lot of > users would like. > > I think there are a few points with this KIP that concern me: > > 1. > It looks like you can only define a single, global DLQ for the entire > Kafka Streams application? What about applications that would like to > define different DLQs for different data flows? This is especially > important when dealing with multiple source topics that have different > record schemas. > > 2. > Your DLQ payload value can either be the record value that failed, or an > error string (such as "error during punctuate"). This is likely to cause > problems when users try to process the records from the DLQ, as they can't > guarantee the format of every record value will be the same. This is very > loosely related to point 1. above. > > 3. > You provide a ProcessorContext to both exception handlers, but state they > cannot be used to forward records. In that case, I believe you should use > ProcessingContext instead, which statically guarantees that it can't be > used to forward records. > > 4. > You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > > Regards, > > Nick > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina > wrote: > >> In a general way, if the user does not configure the right ACL, that >> would be a security issue, but that's true for any topic. >> >> This KIP allows users to configure a Dead Letter Queue without writing >> custom Java code in Kafka Streams, not at the topic level. >> A lot of applications are already implementing this pattern, but the >> required code to do it is quite painful and error prone, for example >> most apps I have seen created a new KafkaProducer to send records to >> their DLQ. >> >> As it would be disabled by default for backward compatibility, I doubt >> it would generate any security concern. >> If a user explicitly configures a Deal Letter Queue, it would be up to >> him to configure the relevant ACLs to ensure that the right principal >> can access it. >> It is already the case for all internal, input and output Kafka >> Streams topics (e.g. repartition, changelog topics) that also could >> contain confidential data, so I do not think we should implement a >> different behavior for this one. >> >> In this KIP, we configured the default DLQ record to have the initial >> record key/value as we assume that it is the expected and wanted >> behavior for most applications. >> If a user does not want to have the key/value in the DLQ record for >> any reason, they could still implement exception handlers to build >> their own DLQ record. >> >> Regarding ACL, maybe something smarter could be done in Kafka Streams, >> but this is out of scope for this KIP. >> >> On Fri, 12 Apr 2024 at 11:58, Claude Warren wrote: >> > >> > My concern is that someone would create a dead letter queue on a >> sensitive >> > topic and not get the ACL correct from the start. Thus causing >> potential >> > confidential data leak. Is there anything in the proposal that would >> > prevent that from happening? If so I did not recognize it as such. >> > >> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > > >> > wrote: >> > >> > > Hi Claude, >> > > >> > > In this KIP, the Dead Letter Queue is materialized by a standard and >> > > independant topic, thus normal ACL applies to it like any other topic. >> > > This should not introduce any security issues, obviously, the right >> > > ACL would need to be provided to write to the DLQ if configured. >> > > >> > > Cheers, >> > > Damien >> > > >> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr >> > > wrote: >> > > > >> > > > I am new to the Kafka codebase so please excuse any ignorance on my >> part. >> > > > >> > > > When a dead letter queue is established is there a process to >> ensure that >> > > > it at least is defined with the same ACL as the original queue? >> Without >> > > > such a guarantee at the start it seems that managing dead letter >> queues >> > > > will be fraught with security issues. >> > > > >> > > > >> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < >> d.gaspar...@gmail.com >> > > > >> > > > wrote: >> > > > >> > > > > Hi everyone, >> > > > > >> > > > > To continue on our effort to improve Kafka Streams error >> handling, we >> > > > >
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Damien, Thanks for the KIP! Dead-letter queues are something that I think a lot of users would like. I think there are a few points with this KIP that concern me: 1. It looks like you can only define a single, global DLQ for the entire Kafka Streams application? What about applications that would like to define different DLQs for different data flows? This is especially important when dealing with multiple source topics that have different record schemas. 2. Your DLQ payload value can either be the record value that failed, or an error string (such as "error during punctuate"). This is likely to cause problems when users try to process the records from the DLQ, as they can't guarantee the format of every record value will be the same. This is very loosely related to point 1. above. 3. You provide a ProcessorContext to both exception handlers, but state they cannot be used to forward records. In that case, I believe you should use ProcessingContext instead, which statically guarantees that it can't be used to forward records. 4. You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? Regards, Nick On Fri, 12 Apr 2024 at 11:38, Damien Gasparina wrote: > In a general way, if the user does not configure the right ACL, that > would be a security issue, but that's true for any topic. > > This KIP allows users to configure a Dead Letter Queue without writing > custom Java code in Kafka Streams, not at the topic level. > A lot of applications are already implementing this pattern, but the > required code to do it is quite painful and error prone, for example > most apps I have seen created a new KafkaProducer to send records to > their DLQ. > > As it would be disabled by default for backward compatibility, I doubt > it would generate any security concern. > If a user explicitly configures a Deal Letter Queue, it would be up to > him to configure the relevant ACLs to ensure that the right principal > can access it. > It is already the case for all internal, input and output Kafka > Streams topics (e.g. repartition, changelog topics) that also could > contain confidential data, so I do not think we should implement a > different behavior for this one. > > In this KIP, we configured the default DLQ record to have the initial > record key/value as we assume that it is the expected and wanted > behavior for most applications. > If a user does not want to have the key/value in the DLQ record for > any reason, they could still implement exception handlers to build > their own DLQ record. > > Regarding ACL, maybe something smarter could be done in Kafka Streams, > but this is out of scope for this KIP. > > On Fri, 12 Apr 2024 at 11:58, Claude Warren wrote: > > > > My concern is that someone would create a dead letter queue on a > sensitive > > topic and not get the ACL correct from the start. Thus causing potential > > confidential data leak. Is there anything in the proposal that would > > prevent that from happening? If so I did not recognize it as such. > > > > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > > wrote: > > > > > Hi Claude, > > > > > > In this KIP, the Dead Letter Queue is materialized by a standard and > > > independant topic, thus normal ACL applies to it like any other topic. > > > This should not introduce any security issues, obviously, the right > > > ACL would need to be provided to write to the DLQ if configured. > > > > > > Cheers, > > > Damien > > > > > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > > > wrote: > > > > > > > > I am new to the Kafka codebase so please excuse any ignorance on my > part. > > > > > > > > When a dead letter queue is established is there a process to ensure > that > > > > it at least is defined with the same ACL as the original queue? > Without > > > > such a guarantee at the start it seems that managing dead letter > queues > > > > will be fraught with security issues. > > > > > > > > > > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > d.gaspar...@gmail.com > > > > > > > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > To continue on our effort to improve Kafka Streams error handling, > we > > > > > propose a new KIP to add out of the box support for Dead Letter > Queue. > > > > > The goal of this KIP is to provide a default implementation that > > > > > should be suitable for most applications and allow users to > override > > > > > it if they have specific requirements. > > > > > > > > > > In order to build a suitable payload, some additional changes are > > > > > included in this KIP: > > > > > 1. extend the ProcessingContext to hold, when available, the > source > > > > > node raw key/value byte[] > > > > > 2. expose the ProcessingContext to the > ProductionExceptionHandler, > > > > > it is currently not available in the handle parameters. > > > > > > > > > > Regarding point 2., to expose the ProcessingContext to the > > > > > Produ
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
In a general way, if the user does not configure the right ACL, that would be a security issue, but that's true for any topic. This KIP allows users to configure a Dead Letter Queue without writing custom Java code in Kafka Streams, not at the topic level. A lot of applications are already implementing this pattern, but the required code to do it is quite painful and error prone, for example most apps I have seen created a new KafkaProducer to send records to their DLQ. As it would be disabled by default for backward compatibility, I doubt it would generate any security concern. If a user explicitly configures a Deal Letter Queue, it would be up to him to configure the relevant ACLs to ensure that the right principal can access it. It is already the case for all internal, input and output Kafka Streams topics (e.g. repartition, changelog topics) that also could contain confidential data, so I do not think we should implement a different behavior for this one. In this KIP, we configured the default DLQ record to have the initial record key/value as we assume that it is the expected and wanted behavior for most applications. If a user does not want to have the key/value in the DLQ record for any reason, they could still implement exception handlers to build their own DLQ record. Regarding ACL, maybe something smarter could be done in Kafka Streams, but this is out of scope for this KIP. On Fri, 12 Apr 2024 at 11:58, Claude Warren wrote: > > My concern is that someone would create a dead letter queue on a sensitive > topic and not get the ACL correct from the start. Thus causing potential > confidential data leak. Is there anything in the proposal that would > prevent that from happening? If so I did not recognize it as such. > > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > wrote: > > > Hi Claude, > > > > In this KIP, the Dead Letter Queue is materialized by a standard and > > independant topic, thus normal ACL applies to it like any other topic. > > This should not introduce any security issues, obviously, the right > > ACL would need to be provided to write to the DLQ if configured. > > > > Cheers, > > Damien > > > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > > wrote: > > > > > > I am new to the Kafka codebase so please excuse any ignorance on my part. > > > > > > When a dead letter queue is established is there a process to ensure that > > > it at least is defined with the same ACL as the original queue? Without > > > such a guarantee at the start it seems that managing dead letter queues > > > will be fraught with security issues. > > > > > > > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina > > > > > wrote: > > > > > > > Hi everyone, > > > > > > > > To continue on our effort to improve Kafka Streams error handling, we > > > > propose a new KIP to add out of the box support for Dead Letter Queue. > > > > The goal of this KIP is to provide a default implementation that > > > > should be suitable for most applications and allow users to override > > > > it if they have specific requirements. > > > > > > > > In order to build a suitable payload, some additional changes are > > > > included in this KIP: > > > > 1. extend the ProcessingContext to hold, when available, the source > > > > node raw key/value byte[] > > > > 2. expose the ProcessingContext to the ProductionExceptionHandler, > > > > it is currently not available in the handle parameters. > > > > > > > > Regarding point 2., to expose the ProcessingContext to the > > > > ProductionExceptionHandler, we considered two choices: > > > > 1. exposing the ProcessingContext as a parameter in the handle() > > > > method. That's the cleanest way IMHO, but we would need to deprecate > > > > the old method. > > > > 2. exposing the ProcessingContext as an attribute in the interface. > > > > This way, no method is deprecated, but we would not be consistent with > > > > the other ExceptionHandler. > > > > > > > > In the KIP, we chose the 1. solution (new handle signature with old > > > > one deprecated), but we could use other opinions on this part. > > > > More information is available directly on the KIP. > > > > > > > > KIP link: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams > > > > > > > > Feedbacks and suggestions are welcome, > > > > > > > > Cheers, > > > > Damien, Sebastien and Loic > > > > > > > > > -- > LinkedIn: http://www.linkedin.com/in/claudewarren
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
My concern is that someone would create a dead letter queue on a sensitive topic and not get the ACL correct from the start. Thus causing potential confidential data leak. Is there anything in the proposal that would prevent that from happening? If so I did not recognize it as such. On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina wrote: > Hi Claude, > > In this KIP, the Dead Letter Queue is materialized by a standard and > independant topic, thus normal ACL applies to it like any other topic. > This should not introduce any security issues, obviously, the right > ACL would need to be provided to write to the DLQ if configured. > > Cheers, > Damien > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > wrote: > > > > I am new to the Kafka codebase so please excuse any ignorance on my part. > > > > When a dead letter queue is established is there a process to ensure that > > it at least is defined with the same ACL as the original queue? Without > > such a guarantee at the start it seems that managing dead letter queues > > will be fraught with security issues. > > > > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina > > > wrote: > > > > > Hi everyone, > > > > > > To continue on our effort to improve Kafka Streams error handling, we > > > propose a new KIP to add out of the box support for Dead Letter Queue. > > > The goal of this KIP is to provide a default implementation that > > > should be suitable for most applications and allow users to override > > > it if they have specific requirements. > > > > > > In order to build a suitable payload, some additional changes are > > > included in this KIP: > > > 1. extend the ProcessingContext to hold, when available, the source > > > node raw key/value byte[] > > > 2. expose the ProcessingContext to the ProductionExceptionHandler, > > > it is currently not available in the handle parameters. > > > > > > Regarding point 2., to expose the ProcessingContext to the > > > ProductionExceptionHandler, we considered two choices: > > > 1. exposing the ProcessingContext as a parameter in the handle() > > > method. That's the cleanest way IMHO, but we would need to deprecate > > > the old method. > > > 2. exposing the ProcessingContext as an attribute in the interface. > > > This way, no method is deprecated, but we would not be consistent with > > > the other ExceptionHandler. > > > > > > In the KIP, we chose the 1. solution (new handle signature with old > > > one deprecated), but we could use other opinions on this part. > > > More information is available directly on the KIP. > > > > > > KIP link: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams > > > > > > Feedbacks and suggestions are welcome, > > > > > > Cheers, > > > Damien, Sebastien and Loic > > > > -- LinkedIn: http://www.linkedin.com/in/claudewarren
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Claude, In this KIP, the Dead Letter Queue is materialized by a standard and independant topic, thus normal ACL applies to it like any other topic. This should not introduce any security issues, obviously, the right ACL would need to be provided to write to the DLQ if configured. Cheers, Damien On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr wrote: > > I am new to the Kafka codebase so please excuse any ignorance on my part. > > When a dead letter queue is established is there a process to ensure that > it at least is defined with the same ACL as the original queue? Without > such a guarantee at the start it seems that managing dead letter queues > will be fraught with security issues. > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina > wrote: > > > Hi everyone, > > > > To continue on our effort to improve Kafka Streams error handling, we > > propose a new KIP to add out of the box support for Dead Letter Queue. > > The goal of this KIP is to provide a default implementation that > > should be suitable for most applications and allow users to override > > it if they have specific requirements. > > > > In order to build a suitable payload, some additional changes are > > included in this KIP: > > 1. extend the ProcessingContext to hold, when available, the source > > node raw key/value byte[] > > 2. expose the ProcessingContext to the ProductionExceptionHandler, > > it is currently not available in the handle parameters. > > > > Regarding point 2., to expose the ProcessingContext to the > > ProductionExceptionHandler, we considered two choices: > > 1. exposing the ProcessingContext as a parameter in the handle() > > method. That's the cleanest way IMHO, but we would need to deprecate > > the old method. > > 2. exposing the ProcessingContext as an attribute in the interface. > > This way, no method is deprecated, but we would not be consistent with > > the other ExceptionHandler. > > > > In the KIP, we chose the 1. solution (new handle signature with old > > one deprecated), but we could use other opinions on this part. > > More information is available directly on the KIP. > > > > KIP link: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams > > > > Feedbacks and suggestions are welcome, > > > > Cheers, > > Damien, Sebastien and Loic > >
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
I am new to the Kafka codebase so please excuse any ignorance on my part. When a dead letter queue is established is there a process to ensure that it at least is defined with the same ACL as the original queue? Without such a guarantee at the start it seems that managing dead letter queues will be fraught with security issues. On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina wrote: > Hi everyone, > > To continue on our effort to improve Kafka Streams error handling, we > propose a new KIP to add out of the box support for Dead Letter Queue. > The goal of this KIP is to provide a default implementation that > should be suitable for most applications and allow users to override > it if they have specific requirements. > > In order to build a suitable payload, some additional changes are > included in this KIP: > 1. extend the ProcessingContext to hold, when available, the source > node raw key/value byte[] > 2. expose the ProcessingContext to the ProductionExceptionHandler, > it is currently not available in the handle parameters. > > Regarding point 2., to expose the ProcessingContext to the > ProductionExceptionHandler, we considered two choices: > 1. exposing the ProcessingContext as a parameter in the handle() > method. That's the cleanest way IMHO, but we would need to deprecate > the old method. > 2. exposing the ProcessingContext as an attribute in the interface. > This way, no method is deprecated, but we would not be consistent with > the other ExceptionHandler. > > In the KIP, we chose the 1. solution (new handle signature with old > one deprecated), but we could use other opinions on this part. > More information is available directly on the KIP. > > KIP link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams > > Feedbacks and suggestions are welcome, > > Cheers, > Damien, Sebastien and Loic >