Hi Andrew, Thanks a lot for your review, plenty of good points!
11. Typo fixed, good cach. 12. I do agree with you and Nick also mentioned it, I updated the KIP to mention that context headers should be forwarded. 13. Good catch, to be consistent with KIP-298, and without a strong opinion from my side, I updated the KIP with your prefix proposal. 14. I am not sure about this point, a big difference between KIP-298 and this one is that the handlers can easily be overridden, something that is not doable in Kafka Connect. If someone would like a different behavior, e.g. to mask the payload or include further headers, I think we should encourage them to write their own exception handlers to build the DLQ Record the way they expect. 15. Yeah, that's a good point, I was not fully convinced about putting a String in it, I do assume that "null" is also a valid value. I do assume that the Stacktrace and the Exception in this case are the key metadata for the user to troubleshoot the problem. I updated the KIP to mention that the value should be null if triggered in a punctuate. 16. I added a session to mention that Kafka Streams would not try to automatically create the topic and the topic should either be automatically created, or pre-created. 17. If a DLQ record can not be sent, the exception should go to the uncaughtExceptionHandler. Let me clearly state it in the KIP. On Fri, 12 Apr 2024 at 17:25, Damien Gasparina <d.gaspar...@gmail.com> wrote: > > Hi Nick, > > 1. Good point, that's less impactful than a custom interface, I just > updated the KIP with the new signature. > > 1.a I really meant ProducerRecord, that's the class used to forward to > downstream processors in the PAPI. The only information missing in > this class is the topic name. I also considered relying on the Kafka > Producer ProducerRecord, but I assume it would not be consistent with > the KafkaStreams API. > > 2. Agreed > > 2.a I do think exceptions occurring during punctuate should be > included in the DLQ. > Even if building a suitable payload is almost impossible, even with > custom code; those exceptions are still fatal for Kafka Streams by > default and are something that can not be ignored safely. > I do assume that most users would want to be informed if an error > happened during a punctuate, even if only the metadata (e.g. > stacktrace, exception) is provided. > I am only concerned flooding the DLQ topic as, if a scheduled > operation failed, very likely it will fails during the next > invocation, but > > 4. Good point, I clarified the wording in the KIP to make it explicit. > > 5. Good point, I will clearly mention that it is out of scope as part > of the KIP and might not be as trivial as people could expect. I will > update the KIP once I do have some spare time. > > 6. Oh yeah, I didn't think about it, but forwarding input headers > would definitely make sense. Confluent Schema Registry ID is actually > part of the payload, but many correlation ID and technical metadata > are passed through headers, it makes sense to forward them, specially > as it is the default behavior of Kafka Streams, > > > > On Fri, 12 Apr 2024 at 15:25, Nick Telford <nick.telf...@gmail.com> wrote: > > > > Hi Damien and Sebastien, > > > > 1. > > I think you can just add a `String topic` argument to the existing > > `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > > deadLetterQueueRecord)` method, and then the implementation of the > > exception handler could choose the topic to send records to using whatever > > logic the user desires. You could perhaps provide a built-in implementation > > that leverages your new config to send all records to an untyped DLQ topic? > > > > 1a. > > BTW you have a typo: in your DeserializationExceptionHandler, the type of > > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should > > probably be `ConsumerRecord`. > > > > 2. > > Agreed. I think it's a good idea to provide an implementation that sends to > > a single DLQ by default, but it's important to enable users to customize > > this with their own exception handlers. > > > > 2a. > > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a > > DLQ topic like it's a bad record. To me, a DLQ should only contain records > > that failed to process. I'm not even sure how a user would > > re-process/action one of these other errors; it seems like the purview of > > error logging to me? > > > > 4. > > My point here was that I think it would be useful for the KIP to contain an > > explanation of the behavior both with KIP-1033 and without it. i.e. clarify > > if/how records that throw an exception in a processor are handled. At the > > moment, I'm assuming that without KIP-1033, processing exceptions would not > > cause records to be sent to the DLQ, but with KIP-1033, they would. If this > > assumption is correct, I think it should be made explicit in the KIP. > > > > 5. > > Understood. You may want to make this explicit in the documentation for > > users, so they understand the consequences of re-processing data sent to > > their DLQ. The main reason I raised this point is it's something that's > > tripped me up in numerous KIPs that that committers frequently remind me > > of; so I wanted to get ahead of it for once! :D > > > > And one new point: > > 6. > > The DLQ record schema appears to discard all custom headers set on the > > source record. Is there a way these can be included? In particular, I'm > > concerned with "schema pointer" headers (like those set by Schema > > Registry), that may need to be propagated, especially if the records are > > fed back into the source topics for re-processing by the user. > > > > Regards, > > Nick > > > > > > On Fri, 12 Apr 2024 at 13:20, Damien Gasparina <d.gaspar...@gmail.com> > > wrote: > > > > > Hi Nick, > > > > > > Thanks a lot for your review and your useful comments! > > > > > > 1. It is a good point, as you mentioned, I think it would make sense > > > in some use cases to have potentially multiple DLQ topics, so we > > > should provide an API to let users do it. > > > Thinking out-loud here, maybe it is a better approach to create a new > > > Record class containing the topic name, e.g. DeadLetterQueueRecord and > > > changing the signature to > > > withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord> > > > deadLetterQueueRecords) instead of > > > withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]> > > > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would > > > be something like "class DeadLetterQueueRecord extends > > > org.apache.kafka.streams.processor.api;.ProducerRecords { String > > > topic; /* + getter/setter + */ } " > > > > > > 2. I think the root question here is: should we have one DLQ topic or > > > multiple DLQ topics by default. This question highly depends on the > > > context, but implementing a default implementation to handle multiple > > > DLQ topics would be opinionated, e.g. how to manage errors in a > > > punctuate? > > > I think it makes sense to have the default implementation writing all > > > faulty records to a single DLQ, that's at least the approach I used in > > > past applications: one DLQ per Kafka Streams application. Of course > > > the message format could change in the DLQ e.g. due to the source > > > topic, but those DLQ records will be very likely troubleshooted, and > > > maybe replay, manually anyway. > > > If a user needs to have multiple DLQ topics or want to enforce a > > > specific schema, it's still possible, but they would need to implement > > > custom Exception Handlers. > > > Coming back to 1. I do agree that it would make sense to have the user > > > set the DLQ topic name in the handlers for more flexibility. > > > > > > 3. Good point, sorry it was a typo, the ProcessingContext makes much > > > more sense here indeed. > > > > > > 4. I do assume that we could implement KIP-1033 (Processing exception > > > handler) independently from KIP-1034. I do hope that KIP-1033 would be > > > adopted and implemented before KIP-1034, but if that's not the case, > > > we could implement KIP-1034 indepantly and update KIP-1033 to include > > > the DLQ record afterward (in the same KIP or in a new one if not > > > possible). > > > > > > 5. I think we should be clear that this KIP only covers the DLQ record > > > produced. > > > Everything related to replay messages or recovery plan should be > > > considered out-of-scope as it is use-case and error specific. > > > > > > Let me know if that's not clear, there are definitely points that > > > highly debatable. > > > > > > Cheers, > > > Damien > > > > > > On Fri, 12 Apr 2024 at 13:00, Nick Telford <nick.telf...@gmail.com> wrote: > > > > > > > > Oh, and one more thing: > > > > > > > > 5. > > > > Whenever you take a record out of the stream, and then potentially > > > > re-introduce it at a later date, you introduce the potential for record > > > > ordering issues. For example, that record could have been destined for a > > > > Window that has been closed by the time it's re-processed. I'd like to > > > see > > > > a section that considers these consequences, and perhaps make those > > > > risks > > > > clear to users. For the record, this is exactly what sunk KIP-990, which > > > > was an alternative approach to error handling that introduced the same > > > > issues. > > > > > > > > Cheers, > > > > > > > > Nick > > > > > > > > On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com> > > > wrote: > > > > > > > > > Hi Damien, > > > > > > > > > > Thanks for the KIP! Dead-letter queues are something that I think a > > > lot of > > > > > users would like. > > > > > > > > > > I think there are a few points with this KIP that concern me: > > > > > > > > > > 1. > > > > > It looks like you can only define a single, global DLQ for the entire > > > > > Kafka Streams application? What about applications that would like to > > > > > define different DLQs for different data flows? This is especially > > > > > important when dealing with multiple source topics that have different > > > > > record schemas. > > > > > > > > > > 2. > > > > > Your DLQ payload value can either be the record value that failed, or > > > an > > > > > error string (such as "error during punctuate"). This is likely to > > > cause > > > > > problems when users try to process the records from the DLQ, as they > > > can't > > > > > guarantee the format of every record value will be the same. This is > > > very > > > > > loosely related to point 1. above. > > > > > > > > > > 3. > > > > > You provide a ProcessorContext to both exception handlers, but state > > > they > > > > > cannot be used to forward records. In that case, I believe you should > > > use > > > > > ProcessingContext instead, which statically guarantees that it can't > > > > > be > > > > > used to forward records. > > > > > > > > > > 4. > > > > > You mention the KIP-1033 ProcessingExceptionHandler, but what's the > > > plan > > > > > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > > > > > > > > > > Regards, > > > > > > > > > > Nick > > > > > > > > > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <d.gaspar...@gmail.com> > > > > > wrote: > > > > > > > > > >> In a general way, if the user does not configure the right ACL, that > > > > >> would be a security issue, but that's true for any topic. > > > > >> > > > > >> This KIP allows users to configure a Dead Letter Queue without > > > > >> writing > > > > >> custom Java code in Kafka Streams, not at the topic level. > > > > >> A lot of applications are already implementing this pattern, but the > > > > >> required code to do it is quite painful and error prone, for example > > > > >> most apps I have seen created a new KafkaProducer to send records to > > > > >> their DLQ. > > > > >> > > > > >> As it would be disabled by default for backward compatibility, I > > > > >> doubt > > > > >> it would generate any security concern. > > > > >> If a user explicitly configures a Deal Letter Queue, it would be up > > > > >> to > > > > >> him to configure the relevant ACLs to ensure that the right principal > > > > >> can access it. > > > > >> It is already the case for all internal, input and output Kafka > > > > >> Streams topics (e.g. repartition, changelog topics) that also could > > > > >> contain confidential data, so I do not think we should implement a > > > > >> different behavior for this one. > > > > >> > > > > >> In this KIP, we configured the default DLQ record to have the initial > > > > >> record key/value as we assume that it is the expected and wanted > > > > >> behavior for most applications. > > > > >> If a user does not want to have the key/value in the DLQ record for > > > > >> any reason, they could still implement exception handlers to build > > > > >> their own DLQ record. > > > > >> > > > > >> Regarding ACL, maybe something smarter could be done in Kafka > > > > >> Streams, > > > > >> but this is out of scope for this KIP. > > > > >> > > > > >> On Fri, 12 Apr 2024 at 11:58, Claude Warren <cla...@xenei.com> wrote: > > > > >> > > > > > >> > My concern is that someone would create a dead letter queue on a > > > > >> sensitive > > > > >> > topic and not get the ACL correct from the start. Thus causing > > > > >> potential > > > > >> > confidential data leak. Is there anything in the proposal that > > > would > > > > >> > prevent that from happening? If so I did not recognize it as such. > > > > >> > > > > > >> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina < > > > d.gaspar...@gmail.com > > > > >> > > > > > >> > wrote: > > > > >> > > > > > >> > > Hi Claude, > > > > >> > > > > > > >> > > In this KIP, the Dead Letter Queue is materialized by a standard > > > and > > > > >> > > independant topic, thus normal ACL applies to it like any other > > > topic. > > > > >> > > This should not introduce any security issues, obviously, the > > > right > > > > >> > > ACL would need to be provided to write to the DLQ if configured. > > > > >> > > > > > > >> > > Cheers, > > > > >> > > Damien > > > > >> > > > > > > >> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > > > > >> > > <claude.war...@aiven.io.invalid> wrote: > > > > >> > > > > > > > >> > > > I am new to the Kafka codebase so please excuse any ignorance > > > on my > > > > >> part. > > > > >> > > > > > > > >> > > > When a dead letter queue is established is there a process to > > > > >> ensure that > > > > >> > > > it at least is defined with the same ACL as the original queue? > > > > >> Without > > > > >> > > > such a guarantee at the start it seems that managing dead > > > > >> > > > letter > > > > >> queues > > > > >> > > > will be fraught with security issues. > > > > >> > > > > > > > >> > > > > > > > >> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > > > > >> d.gaspar...@gmail.com > > > > >> > > > > > > > >> > > > wrote: > > > > >> > > > > > > > >> > > > > Hi everyone, > > > > >> > > > > > > > > >> > > > > To continue on our effort to improve Kafka Streams error > > > > >> handling, we > > > > >> > > > > propose a new KIP to add out of the box support for Dead > > > Letter > > > > >> Queue. > > > > >> > > > > The goal of this KIP is to provide a default implementation > > > that > > > > >> > > > > should be suitable for most applications and allow users to > > > > >> override > > > > >> > > > > it if they have specific requirements. > > > > >> > > > > > > > > >> > > > > In order to build a suitable payload, some additional changes > > > are > > > > >> > > > > included in this KIP: > > > > >> > > > > 1. extend the ProcessingContext to hold, when available, > > > > >> > > > > the > > > > >> source > > > > >> > > > > node raw key/value byte[] > > > > >> > > > > 2. expose the ProcessingContext to the > > > > >> ProductionExceptionHandler, > > > > >> > > > > it is currently not available in the handle parameters. > > > > >> > > > > > > > > >> > > > > Regarding point 2., to expose the ProcessingContext to the > > > > >> > > > > ProductionExceptionHandler, we considered two choices: > > > > >> > > > > 1. exposing the ProcessingContext as a parameter in the > > > handle() > > > > >> > > > > method. That's the cleanest way IMHO, but we would need to > > > > >> deprecate > > > > >> > > > > the old method. > > > > >> > > > > 2. exposing the ProcessingContext as an attribute in the > > > > >> interface. > > > > >> > > > > This way, no method is deprecated, but we would not be > > > consistent > > > > >> with > > > > >> > > > > the other ExceptionHandler. > > > > >> > > > > > > > > >> > > > > In the KIP, we chose the 1. solution (new handle signature > > > with > > > > >> old > > > > >> > > > > one deprecated), but we could use other opinions on this > > > > >> > > > > part. > > > > >> > > > > More information is available directly on the KIP. > > > > >> > > > > > > > > >> > > > > KIP link: > > > > >> > > > > > > > > >> > > > > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams > > > > >> > > > > > > > > >> > > > > Feedbacks and suggestions are welcome, > > > > >> > > > > > > > > >> > > > > Cheers, > > > > >> > > > > Damien, Sebastien and Loic > > > > >> > > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > -- > > > > >> > LinkedIn: http://www.linkedin.com/in/claudewarren > > > > >> > > > > > > > >