Hi Andrew,

Thanks a lot for your review, plenty of good points!

11. Typo fixed, good cach.

12. I do agree with you and Nick also mentioned it, I updated the KIP
to mention that context headers should be forwarded.

13. Good catch, to be consistent with KIP-298, and without a strong
opinion from my side, I updated the KIP with your prefix proposal.

14. I am not sure about this point, a big difference between KIP-298
and this one is that the handlers can easily be overridden, something
that is not doable in Kafka Connect.
If someone would like a different behavior, e.g. to mask the payload
or include further headers, I think we should encourage them to write
their own exception handlers to build the DLQ Record the way they
expect.

15. Yeah, that's a good point, I was not fully convinced about putting
a String in it, I do assume that "null" is also a valid value. I do
assume that the Stacktrace and the Exception in this case are the key
metadata for the user to troubleshoot the problem.
I updated the KIP to mention that the value should be null if
triggered in a punctuate.

16. I added a session to mention that Kafka Streams would not try to
automatically create the topic and the topic should either be
automatically created, or pre-created.

17. If a DLQ record can not be sent, the exception should go to the
uncaughtExceptionHandler. Let me clearly state it in the KIP.

On Fri, 12 Apr 2024 at 17:25, Damien Gasparina <d.gaspar...@gmail.com> wrote:
>
> Hi Nick,
>
> 1. Good point, that's less impactful than a custom interface, I just
> updated the KIP with the new signature.
>
> 1.a I really meant ProducerRecord, that's the class used to forward to
> downstream processors in the PAPI. The only information missing in
> this class is the topic name. I also considered relying on the Kafka
> Producer ProducerRecord, but I assume it would not be consistent with
> the KafkaStreams API.
>
> 2. Agreed
>
> 2.a I do think exceptions occurring during punctuate should be
> included in the DLQ.
> Even if building a suitable payload is almost impossible, even with
> custom code; those exceptions are still fatal for Kafka Streams by
> default and are something that can not be ignored safely.
> I do assume that most users would want to be informed if an error
> happened during a punctuate, even if only the metadata (e.g.
> stacktrace, exception) is provided.
> I am only concerned flooding the DLQ topic as, if a scheduled
> operation failed, very likely it will fails during the next
> invocation, but
>
> 4. Good point, I clarified the wording in the KIP to make it explicit.
>
> 5. Good point, I will clearly mention that it is out of scope as part
> of the KIP and might not be as trivial as people could expect. I will
> update the KIP once I do have some spare time.
>
> 6. Oh yeah, I didn't think about it, but forwarding input headers
> would definitely make sense. Confluent Schema Registry ID is actually
> part of the payload, but many correlation ID and technical metadata
> are passed through headers, it makes sense to forward them, specially
> as it is the default behavior of Kafka Streams,
>
>
>
> On Fri, 12 Apr 2024 at 15:25, Nick Telford <nick.telf...@gmail.com> wrote:
> >
> > Hi Damien and Sebastien,
> >
> > 1.
> > I think you can just add a `String topic` argument to the existing
> > `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> > deadLetterQueueRecord)` method, and then the implementation of the
> > exception handler could choose the topic to send records to using whatever
> > logic the user desires. You could perhaps provide a built-in implementation
> > that leverages your new config to send all records to an untyped DLQ topic?
> >
> > 1a.
> > BTW you have a typo: in your DeserializationExceptionHandler, the type of
> > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should
> > probably be `ConsumerRecord`.
> >
> > 2.
> > Agreed. I think it's a good idea to provide an implementation that sends to
> > a single DLQ by default, but it's important to enable users to customize
> > this with their own exception handlers.
> >
> > 2a.
> > I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a
> > DLQ topic like it's a bad record. To me, a DLQ should only contain records
> > that failed to process. I'm not even sure how a user would
> > re-process/action one of these other errors; it seems like the purview of
> > error logging to me?
> >
> > 4.
> > My point here was that I think it would be useful for the KIP to contain an
> > explanation of the behavior both with KIP-1033 and without it. i.e. clarify
> > if/how records that throw an exception in a processor are handled. At the
> > moment, I'm assuming that without KIP-1033, processing exceptions would not
> > cause records to be sent to the DLQ, but with KIP-1033, they would. If this
> > assumption is correct, I think it should be made explicit in the KIP.
> >
> > 5.
> > Understood. You may want to make this explicit in the documentation for
> > users, so they understand the consequences of re-processing data sent to
> > their DLQ. The main reason I raised this point is it's something that's
> > tripped me up in numerous KIPs that that committers frequently remind me
> > of; so I wanted to get ahead of it for once! :D
> >
> > And one new point:
> > 6.
> > The DLQ record schema appears to discard all custom headers set on the
> > source record. Is there a way these can be included? In particular, I'm
> > concerned with "schema pointer" headers (like those set by Schema
> > Registry), that may need to be propagated, especially if the records are
> > fed back into the source topics for re-processing by the user.
> >
> > Regards,
> > Nick
> >
> >
> > On Fri, 12 Apr 2024 at 13:20, Damien Gasparina <d.gaspar...@gmail.com>
> > wrote:
> >
> > > Hi Nick,
> > >
> > > Thanks a lot for your review and your useful comments!
> > >
> > > 1. It is a good point, as you mentioned, I think it would make sense
> > > in some use cases to have potentially multiple DLQ topics, so we
> > > should provide an API to let users do it.
> > > Thinking out-loud here, maybe it is a better approach to create a new
> > > Record class containing the topic name, e.g. DeadLetterQueueRecord and
> > > changing the signature to
> > > withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
> > > deadLetterQueueRecords) instead of
> > > withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> > > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would
> > > be something like "class DeadLetterQueueRecord extends
> > > org.apache.kafka.streams.processor.api;.ProducerRecords { String
> > > topic; /*  + getter/setter + */ } "
> > >
> > > 2. I think the root question here is: should we have one DLQ topic or
> > > multiple DLQ topics by default. This question highly depends on the
> > > context, but implementing a default implementation to handle multiple
> > > DLQ topics would be opinionated, e.g. how to manage errors in a
> > > punctuate?
> > > I think it makes sense to have the default implementation writing all
> > > faulty records to a single DLQ, that's at least the approach I used in
> > > past applications: one DLQ per Kafka Streams application. Of course
> > > the message format could change in the DLQ e.g. due to the source
> > > topic, but those DLQ records will be very likely troubleshooted, and
> > > maybe replay, manually anyway.
> > > If a user needs to have multiple DLQ topics or want to enforce a
> > > specific schema, it's still possible, but they would need to implement
> > > custom Exception Handlers.
> > > Coming back to 1. I do agree that it would make sense to have the user
> > > set the DLQ topic name in the handlers for more flexibility.
> > >
> > > 3. Good point, sorry it was a typo, the ProcessingContext makes much
> > > more sense here indeed.
> > >
> > > 4. I do assume that we could implement KIP-1033 (Processing exception
> > > handler) independently from KIP-1034. I do hope that KIP-1033 would be
> > > adopted and implemented before KIP-1034, but if that's not the case,
> > > we could implement KIP-1034 indepantly and update KIP-1033 to include
> > > the DLQ record afterward (in the same KIP or in a new one if not
> > > possible).
> > >
> > > 5. I think we should be clear that this KIP only covers the DLQ record
> > > produced.
> > > Everything related to replay messages or recovery plan should be
> > > considered out-of-scope as it is use-case and error specific.
> > >
> > > Let me know if that's not clear, there are definitely points that
> > > highly debatable.
> > >
> > > Cheers,
> > > Damien
> > >
> > > On Fri, 12 Apr 2024 at 13:00, Nick Telford <nick.telf...@gmail.com> wrote:
> > > >
> > > > Oh, and one more thing:
> > > >
> > > > 5.
> > > > Whenever you take a record out of the stream, and then potentially
> > > > re-introduce it at a later date, you introduce the potential for record
> > > > ordering issues. For example, that record could have been destined for a
> > > > Window that has been closed by the time it's re-processed. I'd like to
> > > see
> > > > a section that considers these consequences, and perhaps make those 
> > > > risks
> > > > clear to users. For the record, this is exactly what sunk KIP-990, which
> > > > was an alternative approach to error handling that introduced the same
> > > > issues.
> > > >
> > > > Cheers,
> > > >
> > > > Nick
> > > >
> > > > On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Damien,
> > > > >
> > > > > Thanks for the KIP! Dead-letter queues are something that I think a
> > > lot of
> > > > > users would like.
> > > > >
> > > > > I think there are a few points with this KIP that concern me:
> > > > >
> > > > > 1.
> > > > > It looks like you can only define a single, global DLQ for the entire
> > > > > Kafka Streams application? What about applications that would like to
> > > > > define different DLQs for different data flows? This is especially
> > > > > important when dealing with multiple source topics that have different
> > > > > record schemas.
> > > > >
> > > > > 2.
> > > > > Your DLQ payload value can either be the record value that failed, or
> > > an
> > > > > error string (such as "error during punctuate"). This is likely to
> > > cause
> > > > > problems when users try to process the records from the DLQ, as they
> > > can't
> > > > > guarantee the format of every record value will be the same. This is
> > > very
> > > > > loosely related to point 1. above.
> > > > >
> > > > > 3.
> > > > > You provide a ProcessorContext to both exception handlers, but state
> > > they
> > > > > cannot be used to forward records. In that case, I believe you should
> > > use
> > > > > ProcessingContext instead, which statically guarantees that it can't 
> > > > > be
> > > > > used to forward records.
> > > > >
> > > > > 4.
> > > > > You mention the KIP-1033 ProcessingExceptionHandler, but what's the
> > > plan
> > > > > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> > > > >
> > > > > Regards,
> > > > >
> > > > > Nick
> > > > >
> > > > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <d.gaspar...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> In a general way, if the user does not configure the right ACL, that
> > > > >> would be a security issue, but that's true for any topic.
> > > > >>
> > > > >> This KIP allows users to configure a Dead Letter Queue without 
> > > > >> writing
> > > > >> custom Java code in Kafka Streams, not at the topic level.
> > > > >> A lot of applications are already implementing this pattern, but the
> > > > >> required code to do it is quite painful and error prone, for example
> > > > >> most apps I have seen created a new KafkaProducer to send records to
> > > > >> their DLQ.
> > > > >>
> > > > >> As it would be disabled by default for backward compatibility, I 
> > > > >> doubt
> > > > >> it would generate any security concern.
> > > > >> If a user explicitly configures a Deal Letter Queue, it would be up 
> > > > >> to
> > > > >> him to configure the relevant ACLs to ensure that the right principal
> > > > >> can access it.
> > > > >> It is already the case for all internal, input and output Kafka
> > > > >> Streams topics (e.g. repartition, changelog topics) that also could
> > > > >> contain confidential data, so I do not think we should implement a
> > > > >> different behavior for this one.
> > > > >>
> > > > >> In this KIP, we configured the default DLQ record to have the initial
> > > > >> record key/value as we assume that it is the expected and wanted
> > > > >> behavior for most applications.
> > > > >> If a user does not want to have the key/value in the DLQ record for
> > > > >> any reason, they could still implement exception handlers to build
> > > > >> their own DLQ record.
> > > > >>
> > > > >> Regarding ACL, maybe something smarter could be done in Kafka 
> > > > >> Streams,
> > > > >> but this is out of scope for this KIP.
> > > > >>
> > > > >> On Fri, 12 Apr 2024 at 11:58, Claude Warren <cla...@xenei.com> wrote:
> > > > >> >
> > > > >> > My concern is that someone would create a dead letter queue on a
> > > > >> sensitive
> > > > >> > topic and not get the ACL correct from the start.  Thus causing
> > > > >> potential
> > > > >> > confidential data leak.  Is there anything in the proposal that
> > > would
> > > > >> > prevent that from happening?  If so I did not recognize it as such.
> > > > >> >
> > > > >> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
> > > d.gaspar...@gmail.com
> > > > >> >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi Claude,
> > > > >> > >
> > > > >> > > In  this KIP, the Dead Letter Queue is materialized by a standard
> > > and
> > > > >> > > independant topic, thus normal ACL applies to it like any other
> > > topic.
> > > > >> > > This should not introduce any security issues, obviously, the
> > > right
> > > > >> > > ACL would need to be provided to write to the DLQ if configured.
> > > > >> > >
> > > > >> > > Cheers,
> > > > >> > > Damien
> > > > >> > >
> > > > >> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> > > > >> > > <claude.war...@aiven.io.invalid> wrote:
> > > > >> > > >
> > > > >> > > > I am new to the Kafka codebase so please excuse any ignorance
> > > on my
> > > > >> part.
> > > > >> > > >
> > > > >> > > > When a dead letter queue is established is there a process to
> > > > >> ensure that
> > > > >> > > > it at least is defined with the same ACL as the original queue?
> > > > >> Without
> > > > >> > > > such a guarantee at the start it seems that managing dead 
> > > > >> > > > letter
> > > > >> queues
> > > > >> > > > will be fraught with security issues.
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> > > > >> d.gaspar...@gmail.com
> > > > >> > > >
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > > > Hi everyone,
> > > > >> > > > >
> > > > >> > > > > To continue on our effort to improve Kafka Streams error
> > > > >> handling, we
> > > > >> > > > > propose a new KIP to add out of the box support for Dead
> > > Letter
> > > > >> Queue.
> > > > >> > > > > The goal of this KIP is to provide a default implementation
> > > that
> > > > >> > > > > should be suitable for most applications and allow users to
> > > > >> override
> > > > >> > > > > it if they have specific requirements.
> > > > >> > > > >
> > > > >> > > > > In order to build a suitable payload, some additional changes
> > > are
> > > > >> > > > > included in this KIP:
> > > > >> > > > >   1. extend the ProcessingContext to hold, when available, 
> > > > >> > > > > the
> > > > >> source
> > > > >> > > > > node raw key/value byte[]
> > > > >> > > > >   2. expose the ProcessingContext to the
> > > > >> ProductionExceptionHandler,
> > > > >> > > > > it is currently not available in the handle parameters.
> > > > >> > > > >
> > > > >> > > > > Regarding point 2.,  to expose the ProcessingContext to the
> > > > >> > > > > ProductionExceptionHandler, we considered two choices:
> > > > >> > > > >   1. exposing the ProcessingContext as a parameter in the
> > > handle()
> > > > >> > > > > method. That's the cleanest way IMHO, but we would need to
> > > > >> deprecate
> > > > >> > > > > the old method.
> > > > >> > > > >   2. exposing the ProcessingContext as an attribute in the
> > > > >> interface.
> > > > >> > > > > This way, no method is deprecated, but we would not be
> > > consistent
> > > > >> with
> > > > >> > > > > the other ExceptionHandler.
> > > > >> > > > >
> > > > >> > > > > In the KIP, we chose the 1. solution (new handle signature
> > > with
> > > > >> old
> > > > >> > > > > one deprecated), but we could use other opinions on this 
> > > > >> > > > > part.
> > > > >> > > > > More information is available directly on the KIP.
> > > > >> > > > >
> > > > >> > > > > KIP link:
> > > > >> > > > >
> > > > >> > >
> > > > >>
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams
> > > > >> > > > >
> > > > >> > > > > Feedbacks and suggestions are welcome,
> > > > >> > > > >
> > > > >> > > > > Cheers,
> > > > >> > > > > Damien, Sebastien and Loic
> > > > >> > > > >
> > > > >> > >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > LinkedIn: http://www.linkedin.com/in/claudewarren
> > > > >>
> > > > >
> > >

Reply via email to